This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new fa212bc Cleanup Logging for python functions (#2847) fa212bc is described below commit fa212bc8bb0cbd81729b498b6041cde794be9684 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Fri Oct 26 10:13:39 2018 -0700 Cleanup Logging for python functions (#2847) * Cleanup Logging for python functions * More statements to debug * More debug statements * More debug --- .../instance/src/main/python/python_instance.py | 8 ++--- .../instance/src/main/python/server.py | 8 ++--- pulsar-functions/instance/src/main/python/util.py | 37 ++++++++++------------ 3 files changed, 24 insertions(+), 29 deletions(-) diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 03dafae..5f1645d 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -187,7 +187,7 @@ class PythonInstance(object): else: serde_kclass = util.import_class(os.path.dirname(self.user_code), serde) self.input_serdes[topic] = serde_kclass() - Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) + Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) self.consumers[topic] = self.pulsar_client.subscribe( str(topic), subscription_name, consumer_type=mode, @@ -201,7 +201,7 @@ class PythonInstance(object): else: serde_kclass = util.import_class(os.path.dirname(self.user_code), consumer_conf.serdeClassName) self.input_serdes[topic] = serde_kclass() - Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) + Log.debug("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) if consumer_conf.isRegexPattern: self.consumers[topic] = self.pulsar_client.subscribe( re.compile(str(topic)), subscription_name, @@ -237,7 +237,7 @@ class PythonInstance(object): Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start() def actual_execution(self): - Log.info("Started Thread for executing the function") + Log.debug("Started Thread for executing the function") while True: msg = self.queue.get(True) if isinstance(msg, InternalQuitMessage): @@ -321,7 +321,7 @@ class PythonInstance(object): def setup_producer(self): if self.instance_config.function_details.sink.topic != None and \ len(self.instance_config.function_details.sink.topic) > 0: - Log.info("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic) + Log.debug("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic) self.producer = self.pulsar_client.create_producer( str(self.instance_config.function_details.sink.topic), block_if_queue_full=True, diff --git a/pulsar-functions/instance/src/main/python/server.py b/pulsar-functions/instance/src/main/python/server.py index 611a737..58d43d2 100644 --- a/pulsar-functions/instance/src/main/python/server.py +++ b/pulsar-functions/instance/src/main/python/server.py @@ -35,20 +35,20 @@ class InstanceCommunicationServicer(InstanceCommunication_pb2_grpc.InstanceContr self.pyinstance = pyinstance def GetFunctionStatus(self, request, context): - Log.info("Came in GetFunctionStatus") + Log.debug("Came in GetFunctionStatus") return self.pyinstance.get_function_status() def GetAndResetMetrics(self, request, context): - Log.info("Came in GetAndResetMetrics") + Log.debug("Came in GetAndResetMetrics") return self.pyinstance.get_and_reset_metrics() def ResetMetrics(self, request, context): - Log.info("Came in ResetMetrics") + Log.debug("Came in ResetMetrics") self.pyinstance.reset_metrics() return request def GetMetrics(self, request, context): - Log.info("Came in GetMetrics") + Log.debug("Came in GetMetrics") return self.pyinstance.get_metrics() def HealthCheck(self, request, context): diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py index 4736457..56c1ce1 100644 --- a/pulsar-functions/instance/src/main/python/util.py +++ b/pulsar-functions/instance/src/main/python/util.py @@ -36,38 +36,33 @@ PULSAR_FUNCTIONS_API_ROOT = 'functions' def import_class(from_path, full_class_name): from_path = str(from_path) full_class_name = str(full_class_name) - kclass = import_class_from_path(from_path, full_class_name) - if kclass is None: + try: + return import_class_from_path(from_path, full_class_name) + except Exception as e: our_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) api_dir = os.path.join(our_dir, PULSAR_API_ROOT, PULSAR_FUNCTIONS_API_ROOT) - kclass = import_class_from_path(api_dir, full_class_name) - return kclass + try: + return import_class_from_path(api_dir, full_class_name) + except Exception as e: + Log.info("Failed to import class %s from path %s" % (full_class_name, from_path)) + Log.info(e, exc_info=True) + return None def import_class_from_path(from_path, full_class_name): - Log.info('Trying to import %s from path %s' % (full_class_name, from_path)) + Log.debug('Trying to import %s from path %s' % (full_class_name, from_path)) split = full_class_name.split('.') classname_path = '.'.join(split[:-1]) class_name = full_class_name.split('.')[-1] if from_path not in sys.path: - Log.info("Add a new dependency to the path: %s" % from_path) + Log.debug("Add a new dependency to the path: %s" % from_path) sys.path.insert(0, from_path) if not classname_path: - try: - mod = importlib.import_module(class_name) - return mod - except Exception as e: - Log.info("Import failed class_name %s from path %s" % (class_name, from_path)) - Log.info(e, exc_info=True) - return None + mod = importlib.import_module(class_name) + return mod else: - try: - mod = importlib.import_module(classname_path) - retval = getattr(mod, class_name) - return retval - except Exception as e: - Log.info("Import failed class_name %s from path %s" % (class_name, from_path)) - Log.info(e, exc_info=True) - return None + mod = importlib.import_module(classname_path) + retval = getattr(mod, class_name) + return retval def getFullyQualifiedFunctionName(tenant, namespace, name): return "%s/%s/%s" % (tenant, namespace, name)