HuangXingBo commented on a change in pull request #13292: URL: https://github.com/apache/flink/pull/13292#discussion_r481923107
########## File path: flink-python/pyflink/datastream/stream_execution_environment.py ########## @@ -553,6 +553,34 @@ def set_python_executable(self, python_exec: str): .getEnvironmentConfig(self._j_stream_execution_environment) env_config.setString(jvm.PythonOptions.PYTHON_EXECUTABLE.key(), python_exec) + def add_jars(self, jars_path: str): + """ + Adds a list of jar files that contain the user-defined function (UDF) classes and all Review comment: I think `add_jars` is not only used in udf situations, maybe you need to change the annotation ########## File path: flink-python/pyflink/datastream/tests/test_stream_execution_environment.py ########## @@ -421,6 +424,51 @@ def check_python_exec(i): expected.sort() self.assertEqual(expected, result) + def test_add_jars(self): + # find kafka connector jars + flink_source_root = _find_flink_source_root() + jars_abs_path = flink_source_root + '/flink-connectors/flink-sql-connector-kafka' + specific_jars = glob.glob(jars_abs_path + '/target/flink*.jar') + specific_jars = ['file://' + specific_jar for specific_jar in specific_jars] + specific_jars = ';'.join(specific_jars) + + self.env.add_jars(specific_jars) + source_topic = 'test_source_topic' + props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'} + type_info = Types.ROW([Types.INT(), Types.STRING()]) + + # Test for kafka consumer + deserialization_schema = JsonRowDeserializationSchema.builder() \ + .type_info(type_info=type_info).build() + + # Will get a ClassNotFoundException if not add the kafka connector into the pipeline jars. + kafka_consumer = FlinkKafkaConsumer(source_topic, deserialization_schema, props) + self.env.add_source(kafka_consumer).print() + self.env.get_execution_plan() + + def test_add_classpaths(self): + # find kafka connector jars + flink_source_root = _find_flink_source_root() + jars_abs_path = flink_source_root + '/flink-connectors/flink-sql-connector-kafka' + specific_jars = glob.glob(jars_abs_path + '/target/flink*.jar') + specific_jars = ['file://' + specific_jar for specific_jar in specific_jars] + specific_jars = ';'.join(specific_jars) + + self.env.add_classpaths(specific_jars) + source_topic = 'test_source_topic' + props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'} + type_info = Types.ROW([Types.INT(), Types.STRING()]) + + # Test for kafka consumer + deserialization_schema = JsonRowDeserializationSchema.builder() \ + .type_info(type_info=type_info).build() + + # Will get a ClassNotFoundException if not add the kafka connector into the pipeline Review comment: ```suggestion # It will raise a ClassNotFoundException if the kafka connector is not added into the pipeline ``` ########## File path: flink-python/pyflink/datastream/stream_execution_environment.py ########## @@ -553,6 +553,34 @@ def set_python_executable(self, python_exec: str): .getEnvironmentConfig(self._j_stream_execution_environment) env_config.setString(jvm.PythonOptions.PYTHON_EXECUTABLE.key(), python_exec) + def add_jars(self, jars_path: str): + """ + Adds a list of jar files that contain the user-defined function (UDF) classes and all + classes used from within the UDFs. + + :param jars_path: Path of jars that delimited by ';'. + """ + jvm = get_gateway().jvm + jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key() + add_jars_to_context_class_loader(jars_path.split(";")) + env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \ + .getEnvironmentConfig(self._j_stream_execution_environment) + env_config.setString(jars_key, jars_path) + + def add_classpaths(self, classpaths: str): + """ + Adds a list of URLs that are added to the classpath of each user code classloader of the + program. Paths must specify a protocol (e.g. file://) and be accessible on all nodes Review comment: ```suggestion program. Paths must specify a protocol (e.g. file://) and be accessible by all nodes ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org