jerrypeng closed pull request #2847: Cleanup Logging for python functions URL: https://github.com/apache/pulsar/pull/2847
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 03dafaef2f..5f1645dd74 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -187,7 +187,7 @@ def run(self): else: serde_kclass = util.import_class(os.path.dirname(self.user_code), serde) self.input_serdes[topic] = serde_kclass() - Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) + Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) self.consumers[topic] = self.pulsar_client.subscribe( str(topic), subscription_name, consumer_type=mode, @@ -201,7 +201,7 @@ def run(self): else: serde_kclass = util.import_class(os.path.dirname(self.user_code), consumer_conf.serdeClassName) self.input_serdes[topic] = serde_kclass() - Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) + Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) if consumer_conf.isRegexPattern: self.consumers[topic] = self.pulsar_client.subscribe( re.compile(str(topic)), subscription_name, @@ -237,7 +237,7 @@ def run(self): Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start() def actual_execution(self): - Log.info("Started Thread for executing the function") + Log.debug("Started Thread for executing the function") while True: msg = self.queue.get(True) if isinstance(msg, InternalQuitMessage): @@ -321,7 +321,7 @@ def setup_output_serde(self): def setup_producer(self): if self.instance_config.function_details.sink.topic != None and \ len(self.instance_config.function_details.sink.topic) > 0: - Log.info("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic) + Log.debug("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic) self.producer = self.pulsar_client.create_producer( str(self.instance_config.function_details.sink.topic), block_if_queue_full=True, diff --git a/pulsar-functions/instance/src/main/python/server.py b/pulsar-functions/instance/src/main/python/server.py index 611a737031..58d43d204d 100644 --- a/pulsar-functions/instance/src/main/python/server.py +++ b/pulsar-functions/instance/src/main/python/server.py @@ -35,20 +35,20 @@ def __init__(self, pyinstance): self.pyinstance = pyinstance def GetFunctionStatus(self, request, context): - Log.info("Came in GetFunctionStatus") + Log.debug("Came in GetFunctionStatus") return self.pyinstance.get_function_status() def GetAndResetMetrics(self, request, context): - Log.info("Came in GetAndResetMetrics") + Log.debug("Came in GetAndResetMetrics") return self.pyinstance.get_and_reset_metrics() def ResetMetrics(self, request, context): - Log.info("Came in ResetMetrics") + Log.debug("Came in ResetMetrics") self.pyinstance.reset_metrics() return request def GetMetrics(self, request, context): - Log.info("Came in GetMetrics") + Log.debug("Came in GetMetrics") return self.pyinstance.get_metrics() def HealthCheck(self, request, context): diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py index 4736457bbc..56c1ce1c9f 100644 --- a/pulsar-functions/instance/src/main/python/util.py +++ b/pulsar-functions/instance/src/main/python/util.py @@ -36,38 +36,33 @@ def import_class(from_path, full_class_name): from_path = str(from_path) full_class_name = str(full_class_name) - kclass = import_class_from_path(from_path, full_class_name) - if kclass is None: + try: + return import_class_from_path(from_path, full_class_name) + except Exception as e: our_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) api_dir = os.path.join(our_dir, PULSAR_API_ROOT, PULSAR_FUNCTIONS_API_ROOT) - kclass = import_class_from_path(api_dir, full_class_name) - return kclass + try: + return import_class_from_path(api_dir, full_class_name) + except Exception as e: + Log.info("Failed to import class %s from path %s" % (full_class_name, from_path)) + Log.info(e, exc_info=True) + return None def import_class_from_path(from_path, full_class_name): - Log.info('Trying to import %s from path %s' % (full_class_name, from_path)) + Log.debug('Trying to import %s from path %s' % (full_class_name, from_path)) split = full_class_name.split('.') classname_path = '.'.join(split[:-1]) class_name = full_class_name.split('.')[-1] if from_path not in sys.path: - Log.info("Add a new dependency to the path: %s" % from_path) + Log.debug("Add a new dependency to the path: %s" % from_path) sys.path.insert(0, from_path) if not classname_path: - try: - mod = importlib.import_module(class_name) - return mod - except Exception as e: - Log.info("Import failed class_name %s from path %s" % (class_name, from_path)) - Log.info(e, exc_info=True) - return None + mod = importlib.import_module(class_name) + return mod else: - try: - mod = importlib.import_module(classname_path) - retval = getattr(mod, class_name) - return retval - except Exception as e: - Log.info("Import failed class_name %s from path %s" % (class_name, from_path)) - Log.info(e, exc_info=True) - return None + mod = importlib.import_module(classname_path) + retval = getattr(mod, class_name) + return retval def getFullyQualifiedFunctionName(tenant, namespace, name): return "%s/%s/%s" % (tenant, namespace, name) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services