This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new be5e847  [functions] change instance id from string to int and expose 
number of instances in context (#2411)
be5e847 is described below

commit be5e847a18c12453c6a98b93eb1bb3da6eeb70b8
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Wed Sep 26 10:11:33 2018 -0700

    [functions] change instance id from string to int and expose number of 
instances in context (#2411)
    
    * [functions] change instance id from string to int and expose number of 
instances in context
    
     ### Motivation
    
    When writing a connector reading from a list of sources, it is hard for the 
connector implementation to decide how
    to distribute the list of sources across the function instances. because 
there is no way to tell how many function
    instances is running.
    
     ### Changes
    
    - change instance id from string to integer (since the implementation is 
already assuming instance id is an integer)
    - add getNumInstances in the context
    - expose both interfaces in source and sink connector context
    
    * Fix compilation
---
 .../java/org/apache/pulsar/admin/cli/CmdFunctions.java    |  2 +-
 .../java/org/apache/pulsar/functions/api/Context.java     |  9 ++++++++-
 .../org/apache/pulsar/functions/instance/ContextImpl.java |  9 +++++++--
 .../apache/pulsar/functions/instance/InstanceConfig.java  | 11 ++++++++++-
 .../pulsar/functions/instance/JavaInstanceRunnable.java   | 15 +++++++--------
 .../apache/pulsar/functions/windowing/WindowContext.java  |  2 +-
 .../pulsar/functions/windowing/WindowContextImpl.java     |  2 +-
 .../apache/pulsar/functions/runtime/JavaInstanceMain.java |  2 +-
 .../apache/pulsar/functions/runtime/ProcessRuntime.java   |  2 +-
 .../apache/pulsar/functions/runtime/RuntimeSpawner.java   |  4 ++--
 .../pulsar/functions/runtime/ProcessRuntimeTest.java      |  2 +-
 .../apache/pulsar/functions/worker/FunctionActioner.java  |  2 +-
 .../main/java/org/apache/pulsar/io/core/SinkContext.java  | 15 +++++++++++++++
 .../java/org/apache/pulsar/io/core/SourceContext.java     | 15 +++++++++++++++
 14 files changed, 71 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index b176fe1..395e881 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -1258,7 +1258,7 @@ public class CmdFunctions extends CmdBase {
                 // TODO: correctly implement function version and id
                 
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
                 instanceConfig.setFunctionId(UUID.randomUUID().toString());
-                instanceConfig.setInstanceId(Integer.toString(i + 
instanceIdOffset));
+                instanceConfig.setInstanceId(i + instanceIdOffset);
                 instanceConfig.setMaxBufferedTuples(1024);
                 instanceConfig.setPort(Utils.findAvailablePort());
                 RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index 2856b7c..c66ea6e 100644
--- 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -86,7 +86,14 @@ public interface Context {
      *
      * @return the instance id
      */
-    String getInstanceId();
+    int getInstanceId();
+
+    /**
+     * Get the number of instances that invoke this function.
+     *
+     * @return the number of instances that invoke this function.
+     */
+    int getNumInstances();
 
     /**
      * The version of the function that we are executing
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index c4099f4..4d47433 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -177,8 +177,13 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
     }
 
     @Override
-    public String getInstanceId() {
-        return config.getInstanceId().toString();
+    public int getInstanceId() {
+        return config.getInstanceId();
+    }
+
+    @Override
+    public int getNumInstances() {
+        return config.getFunctionDetails().getParallelism();
     }
 
     @Override
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
index 9f9da79..040af91 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
@@ -35,10 +35,19 @@ import 
org.apache.pulsar.functions.proto.Function.FunctionDetails;
 @EqualsAndHashCode
 @ToString
 public class InstanceConfig {
-    private String instanceId;
+    private int instanceId;
     private String functionId;
     private String functionVersion;
     private FunctionDetails functionDetails;
     private int maxBufferedTuples;
     private int port;
+
+    /**
+     * Get the string representation of {@link #getInstanceId()}.
+     *
+     * @return the string representation of {@link #getInstanceId()}.
+     */
+    public String getInstanceName() {
+        return "" + instanceId;
+    }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 32e878d..0f8f6ca 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -51,7 +51,6 @@ import org.apache.logging.log4j.ThreadContext;
 import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.config.Configuration;
 import org.apache.logging.log4j.core.config.LoggerConfig;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -65,7 +64,6 @@ import 
org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Build
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.sink.PulsarSinkConfig;
 import org.apache.pulsar.functions.sink.PulsarSinkDisable;
-import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.source.PulsarSourceConfig;
 import org.apache.pulsar.functions.utils.ConsumerConfig;
@@ -144,7 +142,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
         // initialize the thread context
         ThreadContext.put("function", 
FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
         ThreadContext.put("functionname", 
instanceConfig.getFunctionDetails().getName());
-        ThreadContext.put("instance", instanceConfig.getInstanceId());
+        ThreadContext.put("instance", instanceConfig.getInstanceName());
 
         log.info("Starting Java Instance {} : \n Details = {}",
             instanceConfig.getFunctionDetails().getName(), 
instanceConfig.getFunctionDetails());
@@ -239,17 +237,18 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
     }
 
     private void loadJars() throws Exception {
-
         try {
             // Let's first try to treat it as a nar archive
-            
fnCache.registerFunctionInstanceWithArchive(instanceConfig.getFunctionId(), 
instanceConfig.getInstanceId(),
-                    jarFile);
+            fnCache.registerFunctionInstanceWithArchive(
+                instanceConfig.getFunctionId(),
+                instanceConfig.getInstanceName(),
+                jarFile);
         } catch (FileNotFoundException e) {
             log.info("For Function {} Loading as NAR failed with {}; treating 
it as Jar instead", instanceConfig, e);
             // create the function class loader
             fnCache.registerFunctionInstance(
                     instanceConfig.getFunctionId(),
-                    instanceConfig.getInstanceId(),
+                    instanceConfig.getInstanceName(),
                     Arrays.asList(jarFile),
                     Collections.emptyList());
         }
@@ -393,7 +392,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
         // once the thread quits, clean up the instance
         fnCache.unregisterFunctionInstance(
                 instanceConfig.getFunctionId(),
-                instanceConfig.getInstanceId());
+                instanceConfig.getInstanceName());
         log.info("Unloading JAR files for function {}", instanceConfig);
     }
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
index 1bb54cd..63e395c 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
@@ -40,7 +40,7 @@ public interface WindowContext {
      *
      * @return the instance id
      */
-    String getInstanceId();
+    int getInstanceId();
 
     /**
      * The version of the function that we are executing
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
index c7b3919..e03ed97 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
@@ -42,7 +42,7 @@ public class WindowContextImpl implements WindowContext {
     }
 
     @Override
-    public String getInstanceId() {
+    public int getInstanceId() {
         return this.context.getInstanceId();
     }
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 38a4c28..c18eff5 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -54,7 +54,7 @@ public class JavaInstanceMain implements AutoCloseable {
     protected String jarFile;
 
     @Parameter(names = "--instance_id", description = "Instance Id\n", 
required = true)
-    protected String instanceId;
+    protected int instanceId;
 
     @Parameter(names = "--function_id", description = "Function Id\n", 
required = true)
     protected String functionId;
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 044f636..63f6f3f 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -128,7 +128,7 @@ class ProcessRuntime implements Runtime {
             // TODO:- Find a platform independent way of controlling memory 
for a python application
         }
         args.add("--instance_id");
-        args.add(instanceConfig.getInstanceId());
+        args.add(instanceConfig.getInstanceName());
         args.add("--function_id");
         args.add(instanceConfig.getFunctionId());
         args.add("--function_version");
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index 7049300..030a5a7 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -106,8 +106,8 @@ public class RuntimeSpawner implements AutoCloseable {
 
     public CompletableFuture<FunctionStatus> getFunctionStatus() {
         return runtime.getFunctionStatus().thenApply(f -> {
-           FunctionStatus.Builder builder = FunctionStatus.newBuilder();
-           
builder.mergeFrom(f).setNumRestarts(numRestarts).setInstanceId(instanceConfig.getInstanceId());
+            FunctionStatus.Builder builder = FunctionStatus.newBuilder();
+            
builder.mergeFrom(f).setNumRestarts(numRestarts).setInstanceId(instanceConfig.getInstanceName());
             if (!f.getRunning() && runtimeDeathException != null) {
                 
builder.setFailureException(runtimeDeathException.getMessage());
             }
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 97881a4..2345783 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -104,7 +104,7 @@ public class ProcessRuntimeTest {
         config.setFunctionDetails(createFunctionDetails(runtime));
         config.setFunctionId(java.util.UUID.randomUUID().toString());
         config.setFunctionVersion("1.0");
-        config.setInstanceId(java.util.UUID.randomUUID().toString());
+        config.setInstanceId(0);
         config.setMaxBufferedTuples(1024);
 
         return config;
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 724d80c..a3355b0 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -165,7 +165,7 @@ public class FunctionActioner implements AutoCloseable {
         // TODO: set correct function id and version when features implemented
         instanceConfig.setFunctionId(UUID.randomUUID().toString());
         instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
-        instanceConfig.setInstanceId(String.valueOf(instanceId));
+        instanceConfig.setInstanceId(instanceId);
         instanceConfig.setMaxBufferedTuples(1024);
         
instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
 
diff --git 
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
index 2d58e0c..bf8678b 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
@@ -19,6 +19,21 @@
 package org.apache.pulsar.io.core;
 
 public interface SinkContext {
+
+    /**
+     * The id of the instance that invokes this function.
+     *
+     * @return the instance id
+     */
+    int getInstanceId();
+
+    /**
+     * Get the number of instances that invoke this function.
+     *
+     * @return the number of instances that invoke this function.
+     */
+    int getNumInstances();
+
     /**
      * Record a user defined metric
      * @param metricName The name of the metric
diff --git 
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
index 3ea707e..b557f53 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
@@ -19,6 +19,21 @@
 package org.apache.pulsar.io.core;
 
 public interface SourceContext {
+
+    /**
+     * The id of the instance that invokes this function.
+     *
+     * @return the instance id
+     */
+    int getInstanceId();
+
+    /**
+     * Get the number of instances that invoke this function.
+     *
+     * @return the number of instances that invoke this function.
+     */
+    int getNumInstances();
+
     /**
      * Record a user defined metric
      * @param metricName The name of the metric

Reply via email to