This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new e814a7f fix updating function/source/sink runtimeFlags and specifying mutiple flags (#4572) e814a7f is described below commit e814a7ff8fbfc1fa55dfc8b82b9aed093e8bc1bb Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Thu Jun 20 16:13:56 2019 -0700 fix updating function/source/sink runtimeFlags and specifying mutiple flags (#4572) --- .../apache/pulsar/functions/runtime/RuntimeUtils.java | 15 +++++++++++++-- .../pulsar/functions/utils/FunctionConfigUtils.java | 3 +++ .../apache/pulsar/functions/utils/SinkConfigUtils.java | 3 +++ .../apache/pulsar/functions/utils/SourceConfigUtils.java | 3 +++ .../pulsar/functions/utils/FunctionConfigUtilsTest.java | 16 ++++++++++++++++ .../pulsar/functions/utils/SinkConfigUtilsTest.java | 15 +++++++++++++++ .../pulsar/functions/utils/SourceConfigUtilsTest.java | 15 +++++++++++++++ 7 files changed, 68 insertions(+), 2 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 86e9bdf..fca3e22 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -273,7 +273,9 @@ public class RuntimeUtils { instanceConfig.getFunctionDetails().getName(), shardId)); if (!isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) { - args.add(instanceConfig.getFunctionDetails().getRuntimeFlags()); + for (String runtimeFlagArg : splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags())) { + args.add(runtimeFlagArg); + } } if (instanceConfig.getFunctionDetails().getResources() != null) { Function.Resources resources = instanceConfig.getFunctionDetails().getResources(); @@ -287,7 +289,9 @@ public class RuntimeUtils { } else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) { args.add("python"); if (!isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) { - args.add(instanceConfig.getFunctionDetails().getRuntimeFlags()); + for (String runtimeFlagArg : splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags())) { + args.add(runtimeFlagArg); + } } args.add(instanceFile); args.add("--py"); @@ -396,4 +400,11 @@ public class RuntimeUtils { return result.toString(); } + /** + * Regex for splitting a string using space when not surrounded by single or double quotes + */ + public static String[] splitRuntimeArgs(String input) { + return input.split("\\s(?=([^\"]*\"[^\"]*\")*[^\"]*$)"); + } + } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index b96c7c4..824c1db 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -756,6 +756,9 @@ public class FunctionConfigUtils { if (newConfig.getCleanupSubscription() != null) { mergedConfig.setCleanupSubscription(newConfig.getCleanupSubscription()); } + if (!StringUtils.isEmpty(newConfig.getRuntimeFlags())) { + mergedConfig.setRuntimeFlags(newConfig.getRuntimeFlags()); + } return mergedConfig; } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index e59bf46..42eedc3 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -520,6 +520,9 @@ public class SinkConfigUtils { if (!StringUtils.isEmpty(newConfig.getArchive())) { mergedConfig.setArchive(newConfig.getArchive()); } + if (!StringUtils.isEmpty(newConfig.getRuntimeFlags())) { + mergedConfig.setRuntimeFlags(newConfig.getRuntimeFlags()); + } return mergedConfig; } } \ No newline at end of file diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index e02bb8b..edbb6d7 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -336,6 +336,9 @@ public class SourceConfigUtils { if (!StringUtils.isEmpty(newConfig.getArchive())) { mergedConfig.setArchive(newConfig.getArchive()); } + if (!StringUtils.isEmpty(newConfig.getRuntimeFlags())) { + mergedConfig.setRuntimeFlags(newConfig.getRuntimeFlags()); + } return mergedConfig; } diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java index 9c3f706..29c5e55 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java @@ -389,6 +389,21 @@ public class FunctionConfigUtilsTest { ); } + @Test + public void testMergeRuntimeFlags() { + FunctionConfig functionConfig = createFunctionConfig(); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("runtimeFlags", "-Dfoo=bar2"); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals( + mergedConfig.getRuntimeFlags(), "-Dfoo=bar2" + ); + mergedConfig.setRuntimeFlags(functionConfig.getRuntimeFlags()); + assertEquals( + new Gson().toJson(functionConfig), + new Gson().toJson(mergedConfig) + ); + } + private FunctionConfig createFunctionConfig() { FunctionConfig functionConfig = new FunctionConfig(); functionConfig.setTenant("test-tenant"); @@ -409,6 +424,7 @@ public class FunctionConfigUtilsTest { functionConfig.setTimeoutMs(2000l); functionConfig.setWindowConfig(new WindowConfig().setWindowLengthCount(10)); functionConfig.setCleanupSubscription(true); + functionConfig.setRuntimeFlags("-Dfoo=bar"); return functionConfig; } diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java index 446deb3..eb6ab39 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java @@ -264,6 +264,21 @@ public class SinkConfigUtilsTest { ); } + @Test + public void testMergeRuntimeFlags() { + SinkConfig sinkConfig = createSinkConfig(); + SinkConfig newFunctionConfig = createUpdatedSinkConfig("runtimeFlags", "-Dfoo=bar2"); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newFunctionConfig); + assertEquals( + mergedConfig.getRuntimeFlags(), "-Dfoo=bar2" + ); + mergedConfig.setRuntimeFlags(sinkConfig.getRuntimeFlags()); + assertEquals( + new Gson().toJson(sinkConfig), + new Gson().toJson(mergedConfig) + ); + } + private SinkConfig createSinkConfig() { SinkConfig sinkConfig = new SinkConfig(); sinkConfig.setTenant("test-tenant"); diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java index 85911ed..2005902 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java @@ -198,6 +198,21 @@ public class SourceConfigUtilsTest { ); } + @Test + public void testMergeRuntimeFlags() { + SourceConfig sourceConfig = createSourceConfig(); + SourceConfig newFunctionConfig = createUpdatedSourceConfig("runtimeFlags", "-Dfoo=bar2"); + SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newFunctionConfig); + assertEquals( + mergedConfig.getRuntimeFlags(), "-Dfoo=bar2" + ); + mergedConfig.setRuntimeFlags(sourceConfig.getRuntimeFlags()); + assertEquals( + new Gson().toJson(sourceConfig), + new Gson().toJson(mergedConfig) + ); + } + private SourceConfig createSourceConfig() { SourceConfig sourceConfig = new SourceConfig(); sourceConfig.setTenant("test-tenant");