This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bf7224  propagate default resource values in Pulsar Functions (#3636)
1bf7224 is described below

commit 1bf7224d737d4b82a859125d499d21a1c0d4a35a
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Thu Feb 21 15:57:24 2019 -0800

    propagate default resource values in Pulsar Functions (#3636)
    
    * propagate default resource values in Pulsar Functions
    
    * fix unit tests
---
 .../apache/pulsar/common/functions/Resources.java  | 20 +++++++++++++++++++
 .../functions/utils/FunctionConfigUtils.java       | 23 ++++++++++------------
 .../pulsar/functions/utils/SinkConfigUtils.java    | 22 +++++++++------------
 .../pulsar/functions/utils/SourceConfigUtils.java  | 21 ++++++++------------
 .../functions/utils/FunctionConfigUtilsTest.java   |  6 ++++++
 .../functions/utils/SourceConfigUtilsTest.java     |  3 +++
 6 files changed, 56 insertions(+), 39 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java
index fde3d38..06513d6 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java
@@ -29,10 +29,30 @@ import lombok.*;
 @NoArgsConstructor
 @Builder(toBuilder=true)
 public class Resources {
+
+    private static final Resources DEFAULT = new Resources();
+
     // Default cpu is 1 core
     private Double cpu = 1d;
     // Default memory is 1GB
     private Long ram = 1073741824l;
     // Default disk is 10GB
     private Long disk = 10737418240l;
+
+    public static Resources getDefaultResources() {
+        return DEFAULT;
+    }
+
+    public static Resources mergeWithDefault(Resources resources) {
+
+        if (resources == null) {
+            return DEFAULT;
+        }
+
+        double cpu = resources.getCpu() == null ? 
Resources.getDefaultResources().getCpu() : resources.getCpu();
+        long ram = resources.getRam() == null ? 
Resources.getDefaultResources().getRam() : resources.getRam();
+        long disk = resources.getDisk() == null ? 
Resources.getDefaultResources().getDisk() : resources.getDisk();
+
+        return new Resources(cpu, ram, disk);
+    }
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 3da83c1..aef096a 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -211,19 +211,16 @@ public class FunctionConfigUtils {
         } else {
             functionDetailsBuilder.setParallelism(1);
         }
-        if (functionConfig.getResources() != null) {
-            Function.Resources.Builder bldr = Function.Resources.newBuilder();
-            if (functionConfig.getResources().getCpu() != null) {
-                bldr.setCpu(functionConfig.getResources().getCpu());
-            }
-            if (functionConfig.getResources().getRam() != null) {
-                bldr.setRam(functionConfig.getResources().getRam());
-            }
-            if (functionConfig.getResources().getDisk() != null) {
-                bldr.setDisk(functionConfig.getResources().getDisk());
-            }
-            functionDetailsBuilder.setResources(bldr.build());
-        }
+
+        // use default resources if resources not set
+        Resources resources = 
Resources.mergeWithDefault(functionConfig.getResources());
+
+        Function.Resources.Builder bldr = Function.Resources.newBuilder();
+        bldr.setCpu(resources.getCpu());
+        bldr.setRam(resources.getRam());
+        bldr.setDisk(resources.getDisk());
+        functionDetailsBuilder.setResources(bldr);
+
         return functionDetailsBuilder.build();
     }
 
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index a972947..e7c53c5 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -181,19 +181,15 @@ public class SinkConfigUtils {
         }
         functionDetailsBuilder.setSink(sinkSpecBuilder);
 
-        if (sinkConfig.getResources() != null) {
-            Function.Resources.Builder bldr = Function.Resources.newBuilder();
-            if (sinkConfig.getResources().getCpu() != null) {
-                bldr.setCpu(sinkConfig.getResources().getCpu());
-            }
-            if (sinkConfig.getResources().getRam() != null) {
-                bldr.setRam(sinkConfig.getResources().getRam());
-            }
-            if (sinkConfig.getResources().getDisk() != null) {
-                bldr.setDisk(sinkConfig.getResources().getDisk());
-            }
-            functionDetailsBuilder.setResources(bldr.build());
-        }
+        // use default resources if resources not set
+        Resources resources = 
Resources.mergeWithDefault(sinkConfig.getResources());
+
+        Function.Resources.Builder bldr = Function.Resources.newBuilder();
+        bldr.setCpu(resources.getCpu());
+        bldr.setRam(resources.getRam());
+        bldr.setDisk(resources.getDisk());
+        functionDetailsBuilder.setResources(bldr);
+
         return functionDetailsBuilder.build();
     }
 
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 94274cf..59b05bd 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -124,19 +124,14 @@ public class SourceConfigUtils {
 
         functionDetailsBuilder.setSink(sinkSpecBuilder);
 
-        if (sourceConfig.getResources() != null) {
-            Function.Resources.Builder bldr = Function.Resources.newBuilder();
-            if (sourceConfig.getResources().getCpu() != null) {
-                bldr.setCpu(sourceConfig.getResources().getCpu());
-            }
-            if (sourceConfig.getResources().getRam() != null) {
-                bldr.setRam(sourceConfig.getResources().getRam());
-            }
-            if (sourceConfig.getResources().getDisk() != null) {
-                bldr.setDisk(sourceConfig.getResources().getDisk());
-            }
-            functionDetailsBuilder.setResources(bldr.build());
-        }
+        // use default resources if resources not set
+        Resources resources = 
Resources.mergeWithDefault(sourceConfig.getResources());
+
+        Function.Resources.Builder bldr = Function.Resources.newBuilder();
+        bldr.setCpu(resources.getCpu());
+        bldr.setRam(resources.getRam());
+        bldr.setDisk(resources.getDisk());
+        functionDetailsBuilder.setResources(bldr);
 
         return functionDetailsBuilder.build();
     }
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 0f4f400..b758c72 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -62,6 +62,9 @@ public class FunctionConfigUtilsTest {
         functionConfig.setTimeoutMs(2000l);
         Function.FunctionDetails functionDetails = 
FunctionConfigUtils.convert(functionConfig, null);
         FunctionConfig convertedConfig = 
FunctionConfigUtils.convertFromDetails(functionDetails);
+
+        // add default resources
+        functionConfig.setResources(Resources.getDefaultResources());
         assertEquals(
                 new Gson().toJson(functionConfig),
                 new Gson().toJson(convertedConfig)
@@ -90,6 +93,9 @@ public class FunctionConfigUtilsTest {
         functionConfig.setWindowConfig(new 
WindowConfig().setWindowLengthCount(10));
         Function.FunctionDetails functionDetails = 
FunctionConfigUtils.convert(functionConfig, null);
         FunctionConfig convertedConfig = 
FunctionConfigUtils.convertFromDetails(functionDetails);
+
+        // add default resources
+        functionConfig.setResources(Resources.getDefaultResources());
         assertEquals(
                 new Gson().toJson(functionConfig),
                 new Gson().toJson(convertedConfig)
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index a3aad91..0f4b1cb 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -53,6 +53,9 @@ public class SourceConfigUtilsTest {
         sourceConfig.setConfigs(new HashMap<>());
         Function.FunctionDetails functionDetails = 
SourceConfigUtils.convert(sourceConfig, new 
SourceConfigUtils.ExtractedSourceDetails(null, null));
         SourceConfig convertedConfig = 
SourceConfigUtils.convertFromDetails(functionDetails);
+
+        // add default resources
+        sourceConfig.setResources(Resources.getDefaultResources());
         assertEquals(
                 new Gson().toJson(sourceConfig),
                 new Gson().toJson(convertedConfig)

Reply via email to