This is an automated email from the ASF dual-hosted git repository.

nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6506f9bfd48 [improve][fn] support reading config options from file in 
Function Python Runner (#18951)
6506f9bfd48 is described below

commit 6506f9bfd4896de489d29fa7ec46bd11d9e6d94e
Author: laminar <[email protected]>
AuthorDate: Wed Feb 8 09:17:01 2023 +0800

    [improve][fn] support reading config options from file in Function Python 
Runner (#18951)
    
    Signed-off-by: laminar <[email protected]>
---
 .../instance/src/main/python/python_instance.py    |   4 +-
 .../src/main/python/python_instance_main.py        | 107 +++++++++++++++++----
 pulsar-functions/instance/src/main/python/util.py  |  19 ++++
 .../src/test/python/test_python_instance_main.py   |  69 +++++++++++++
 .../src/test/python/test_python_runtime_config.ini |  27 ++++++
 5 files changed, 205 insertions(+), 21 deletions(-)

diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index f4ad66527e6..cf53e75d9d0 100755
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -80,7 +80,8 @@ class PythonInstance(object):
                pulsar_client,
                secrets_provider,
                cluster_name,
-               state_storage_serviceurl):
+               state_storage_serviceurl,
+               config_file):
     self.instance_config = InstanceConfig(instance_id, function_id, 
function_version, function_details, max_buffered_tuples)
     self.user_code = user_code
     # set queue size to one since consumers already have internal queues. Just 
use queue to communicate message from
@@ -114,6 +115,7 @@ class PythonInstance(object):
                            instance_id, cluster_name,
                            "%s/%s/%s" % (function_details.tenant, 
function_details.namespace, function_details.name)]
     self.stats = Stats(self.metrics_labels)
+    self.config_file = config_file
 
   def health_check(self):
     self.last_health_check_ts = time.time()
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py 
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 3967635365c..2d6520b2e99 100755
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -49,47 +49,113 @@ from bookkeeper.kv.client import Client
 to_run = True
 Log = log.Log
 
+
 def atexit_function(signo, _frame):
   global to_run
   Log.info("Interrupted by %d, shutting down" % signo)
   to_run = False
 
-def main():
-  # Setup signal handlers
-  signal.signal(signal.SIGTERM, atexit_function)
-  signal.signal(signal.SIGHUP, atexit_function)
-  signal.signal(signal.SIGINT, atexit_function)
 
+def generate_arguments_parser():
   parser = argparse.ArgumentParser(description='Pulsar Functions Python 
Instance')
-  parser.add_argument('--function_details', required=True, help='Function 
Details Json String')
-  parser.add_argument('--py', required=True, help='Full Path of Function Code 
File')
-  parser.add_argument('--instance_id', required=True, help='Instance Id')
-  parser.add_argument('--function_id', required=True, help='Function Id')
-  parser.add_argument('--function_version', required=True, help='Function 
Version')
-  parser.add_argument('--pulsar_serviceurl', required=True, help='Pulsar 
Service Url')
+  parser.add_argument('--function_details', required=False, help='Function 
Details Json String')
+  parser.add_argument('--py', required=False, help='Full Path of Function Code 
File')
+  parser.add_argument('--instance_id', required=False, help='Instance Id')
+  parser.add_argument('--function_id', required=False, help='Function Id')
+  parser.add_argument('--function_version', required=False, help='Function 
Version')
+  parser.add_argument('--pulsar_serviceurl', required=False, help='Pulsar 
Service Url')
   parser.add_argument('--client_auth_plugin', required=False, help='Client 
authentication plugin')
   parser.add_argument('--client_auth_params', required=False, help='Client 
authentication params')
   parser.add_argument('--use_tls', required=False, help='Use tls')
   parser.add_argument('--tls_allow_insecure_connection', required=False, 
help='Tls allow insecure connection')
   parser.add_argument('--hostname_verification_enabled', required=False, 
help='Enable hostname verification')
   parser.add_argument('--tls_trust_cert_path', required=False, help='Tls trust 
cert file path')
-  parser.add_argument('--port', required=True, help='Instance Port', type=int)
-  parser.add_argument('--metrics_port', required=True, help="Port metrics will 
be exposed on", type=int)
-  parser.add_argument('--max_buffered_tuples', required=True, help='Maximum 
number of Buffered tuples')
-  parser.add_argument('--logging_directory', required=True, help='Logging 
Directory')
-  parser.add_argument('--logging_file', required=True, help='Log file name')
+  parser.add_argument('--port', required=False, help='Instance Port', type=int)
+  parser.add_argument('--metrics_port', required=False, help="Port metrics 
will be exposed on", type=int)
+  parser.add_argument('--max_buffered_tuples', required=False, help='Maximum 
number of Buffered tuples')
+  parser.add_argument('--logging_directory', required=False, help='Logging 
Directory')
+  parser.add_argument('--logging_file', required=False, help='Log file name')
   parser.add_argument('--logging_level', required=False, help='Logging level')
-  parser.add_argument('--logging_config_file', required=True, help='Config 
file for logging')
-  parser.add_argument('--expected_healthcheck_interval', required=True, 
help='Expected time in seconds between health checks', type=int)
+  parser.add_argument('--logging_config_file', required=False, help='Config 
file for logging')
+  parser.add_argument('--expected_healthcheck_interval', required=False, 
help='Expected time in seconds between health checks', type=int)
   parser.add_argument('--secrets_provider', required=False, help='The 
classname of the secrets provider')
   parser.add_argument('--secrets_provider_config', required=False, help='The 
config that needs to be passed to secrets provider')
   parser.add_argument('--install_usercode_dependencies', required=False, 
help='For packaged python like wheel files, do we need to install all 
dependencies', type=bool)
   parser.add_argument('--dependency_repository', required=False, help='For 
packaged python like wheel files, which repository to pull the dependencies 
from')
   parser.add_argument('--extra_dependency_repository', required=False, 
help='For packaged python like wheel files, any extra repository to pull the 
dependencies from')
   parser.add_argument('--state_storage_serviceurl', required=False, 
help='Managed State Storage Service Url')
-  parser.add_argument('--cluster_name', required=True, help='The name of the 
cluster this instance is running on')
+  parser.add_argument('--cluster_name', required=False, help='The name of the 
cluster this instance is running on')
+  parser.add_argument('--config_file', required=False, default="", 
help='Configuration file name', type=str)
+  return parser
+
+def merge_arguments(args, config_file):
+  """
+  This function is used to merge arguments passed in via the command line
+  and those passed in via the configuration file during initialization.
+
+  :param args: arguments passed in via the command line
+  :param config_file: configuration file name (path)
+
+  During the merge process, the arguments passed in via the command line have 
higher priority,
+  so only optional arguments need to be merged.
+  """
+  if config_file is None:
+    return
+  config = util.read_config(config_file)
+  if not config:
+    return
+  default_config = config["DEFAULT"]
+  if not default_config:
+    return
+  for k, v in vars(args).items():
+    if k == "config_file":
+      continue
+    if not v and default_config.get(k, None):
+      vars(args)[k] = default_config.get(k)
+
+
+def validate_arguments(args):
+  """
+  This function is used to verify the merged arguments,
+  mainly to check whether the mandatory arguments are assigned properly.
+
+  :param args: arguments after merging
+  """
+  mandatory_args_map = {
+    "function_details": args.function_details,
+    "py": args.py,
+    "instance_id": args.instance_id,
+    "function_id": args.function_id,
+    "function_version": args.function_version,
+    "pulsar_serviceurl": args.pulsar_serviceurl,
+    "port": args.port,
+    "metrics_port": args.metrics_port,
+    "max_buffered_tuples": args.max_buffered_tuples,
+    "logging_directory": args.logging_directory,
+    "logging_file": args.logging_file,
+    "logging_config_file": args.logging_config_file,
+    "expected_healthcheck_interval": args.expected_healthcheck_interval,
+    "cluster_name": args.cluster_name
+  }
+  missing_args = []
+  for k, v in mandatory_args_map.items():
+    if v is None:
+      missing_args.append(k)
+  if missing_args:
+    print("The following arguments are required:", missing_args)
+    sys.exit(1)
+
+
+def main():
+  # Setup signal handlers
+  signal.signal(signal.SIGTERM, atexit_function)
+  signal.signal(signal.SIGHUP, atexit_function)
+  signal.signal(signal.SIGINT, atexit_function)
 
+  parser = generate_arguments_parser()
   args = parser.parse_args()
+  merge_arguments(args, args.config_file)
+  validate_arguments(args)
   function_details = Function_pb2.FunctionDetails()
   args.function_details = str(args.function_details)
   if args.function_details[0] == '\'':
@@ -216,7 +282,8 @@ def main():
                                               pulsar_client,
                                               secrets_provider,
                                               args.cluster_name,
-                                              state_storage_serviceurl)
+                                              state_storage_serviceurl,
+                                              args.config_file)
   pyinstance.run()
   server_instance = server.serve(args.port, pyinstance)
 
diff --git a/pulsar-functions/instance/src/main/python/util.py 
b/pulsar-functions/instance/src/main/python/util.py
index 48ba2f0e6d7..f5868093fae 100755
--- a/pulsar-functions/instance/src/main/python/util.py
+++ b/pulsar-functions/instance/src/main/python/util.py
@@ -25,6 +25,8 @@ import os
 import inspect
 import sys
 import importlib
+import configparser
+
 from threading import Timer
 from pulsar.functions import serde
 
@@ -80,6 +82,23 @@ def getFullyQualifiedInstanceId(tenant, namespace, name, 
instance_id):
 def get_properties(fullyQualifiedName, instanceId):
     return {"application": "pulsar-function", "id": str(fullyQualifiedName), 
"instance_id": str(instanceId)}
 
+def read_config(config_file):
+    """
+    The content of the configuration file is styled as follows:
+
+    [DEFAULT]
+    parameter1 = value1
+    parameter2 = value2
+    parameter3 = value3
+    ...
+    """
+    if config_file == "":
+        return None
+
+    cfg = configparser.ConfigParser()
+    cfg.read(config_file)
+    return cfg
+
 class FixedTimer():
 
     def __init__(self, t, hFunction, name="timer-thread"):
diff --git 
a/pulsar-functions/instance/src/test/python/test_python_instance_main.py 
b/pulsar-functions/instance/src/test/python/test_python_instance_main.py
new file mode 100644
index 00000000000..af248691919
--- /dev/null
+++ b/pulsar-functions/instance/src/test/python/test_python_instance_main.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+# DEPENDENCIES:  unittest2
+import python_instance_main
+
+import os
+import log
+import unittest
+
+class TestContextImpl(unittest.TestCase):
+
+  def Any(cls):
+    class Any(cls):
+      def __eq__(self, other):
+        return True
+    return Any()
+
+  def setUp(self):
+    log.init_logger("INFO", "foo", os.environ.get("PULSAR_HOME") + 
"/conf/functions-logging/console_logging_config.ini")
+
+  def test_arguments(self):
+    parser = python_instance_main.generate_arguments_parser()
+    argv = [
+      "--function_details", "test_function_details",
+      "--py", "test_py",
+      "--instance_id", "test_instance_id",
+      "--function_id", "test_function_id",
+      "--function_version", "test_function_version",
+      "--pulsar_serviceurl", "test_pulsar_serviceurl",
+      "--client_auth_plugin", "test_client_auth_plugin",
+      "--client_auth_params", "test_client_auth_params",
+      "--tls_allow_insecure_connection", "true",
+      "--hostname_verification_enabled", "true",
+      "--tls_trust_cert_path", "test_tls_trust_cert_path",
+      "--port", "1000",
+      "--metrics_port", "1001",
+      "--max_buffered_tuples", "100",
+      "--config_file", "test_python_runtime_config.ini"
+    ]
+    args = parser.parse_args(argv)
+    python_instance_main.merge_arguments(args, args.config_file)
+    # argument from command line test
+    self.assertEqual(args.function_details, "test_function_details")
+    # argument from config file test
+    self.assertEqual(args.use_tls, "true")
+    # argument read priority test
+    self.assertEqual(args.port, 1000)
+    # mandatory argument test
+    self.assertEqual(args.expected_healthcheck_interval, "50")
+    # optional argument test
+    self.assertEqual(args.secrets_provider, None)
diff --git 
a/pulsar-functions/instance/src/test/python/test_python_runtime_config.ini 
b/pulsar-functions/instance/src/test/python/test_python_runtime_config.ini
new file mode 100644
index 00000000000..8e172647178
--- /dev/null
+++ b/pulsar-functions/instance/src/test/python/test_python_runtime_config.ini
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+[DEFAULT]
+port=5000
+metrics_port=5001
+use_tls=true
+logging_directory=test_logging_directory
+logging_file=test_logging_file
+logging_config_file=test_logging_config_file
+expected_healthcheck_interval=50
\ No newline at end of file

Reply via email to