This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push:
new c9828e8d287 [FLINK-37505][python] Add pyflink YAML based config support
c9828e8d287 is described below
commit c9828e8d2879e8a91187a105967b61a410119724
Author: Gabor Somogyi <[email protected]>
AuthorDate: Thu Mar 20 16:52:32 2025 +0100
[FLINK-37505][python] Add pyflink YAML based config support
Co-authored-by: Ferenc Csaky <[email protected]>
---
flink-python/pyflink/common/configuration.py | 18 +++++++-----
.../pyflink/common/tests/test_configuration.py | 20 +++++++++----
.../datastream/stream_execution_environment.py | 21 +++++++------
.../tests/test_stream_execution_environment.py | 34 ++++++++++++++++++++++
flink-python/pyflink/table/table_config.py | 5 +++-
flink-python/pyflink/table/table_environment.py | 9 ++++--
6 files changed, 82 insertions(+), 25 deletions(-)
diff --git a/flink-python/pyflink/common/configuration.py
b/flink-python/pyflink/common/configuration.py
index 779568da151..8042fc9afb4 100644
--- a/flink-python/pyflink/common/configuration.py
+++ b/flink-python/pyflink/common/configuration.py
@@ -69,20 +69,24 @@ class Configuration:
jars_key =
jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
classpaths_key =
jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
if key in [jars_key, classpaths_key]:
- jar_urls = Configuration.parse_jars_value(value, jvm)
+ jar_urls = Configuration.parse_list_value(
+ value,
+
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
+ )
add_jars_to_context_class_loader(jar_urls)
self._j_configuration.setString(key, value)
return self
@staticmethod
- def parse_jars_value(value: str, jvm):
- is_standard_yaml =
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
- if is_standard_yaml:
+ def parse_list_value(value: str, standard_yaml: bool = True):
+ if not value:
+ return []
+ if standard_yaml:
from ruamel.yaml import YAML
yaml = YAML(typ='safe')
- jar_urls_list = yaml.load(value)
- if isinstance(jar_urls_list, list):
- return jar_urls_list
+ value_list = yaml.load(value)
+ if isinstance(value_list, list):
+ return value_list
return value.split(";")
def get_integer(self, key: str, default_value: int) -> int:
diff --git a/flink-python/pyflink/common/tests/test_configuration.py
b/flink-python/pyflink/common/tests/test_configuration.py
index 978bba538a7..26ca1e92cf9 100644
--- a/flink-python/pyflink/common/tests/test_configuration.py
+++ b/flink-python/pyflink/common/tests/test_configuration.py
@@ -18,7 +18,6 @@
from copy import deepcopy
from pyflink.common import Configuration
-from pyflink.java_gateway import get_gateway
from pyflink.testing.test_case_utils import PyFlinkTestCase
@@ -165,16 +164,27 @@ class ConfigurationTests(PyFlinkTestCase):
self.assertEqual(str(conf), "{k1=v1, k2=1}")
- def test_parse_jars_value(self):
- jvm = get_gateway().jvm
+ def test_parse_list_value(self):
+ # test None
+ value = None
+ expected_result = []
+ result = Configuration.parse_list_value(value)
+ self.assertEqual(result, expected_result)
+
# test parse YAML list
+ value = "[jar1, jar2, jar3]"
+ expected_result = ['jar1', 'jar2', 'jar3']
+ result = Configuration.parse_list_value(value)
+ self.assertEqual(result, expected_result)
+
+ # test parse multiline YAML list
value = "- jar1\n- jar2\n- jar3"
expected_result = ['jar1', 'jar2', 'jar3']
- result = Configuration.parse_jars_value(value, jvm)
+ result = Configuration.parse_list_value(value)
self.assertEqual(result, expected_result)
# test parse legacy pattern
value = "jar1;jar2;jar3"
expected_result = ['jar1', 'jar2', 'jar3']
- result = Configuration.parse_jars_value(value, jvm)
+ result = Configuration.parse_list_value(value)
self.assertEqual(result, expected_result)
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py
b/flink-python/pyflink/datastream/stream_execution_environment.py
index ce84e94ec0e..9553d4f4bfe 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -768,11 +768,12 @@ class StreamExecutionEnvironment(object):
jars_key =
jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
.getEnvironmentConfig(self._j_stream_execution_environment)
- old_jar_paths = env_config.getString(jars_key, None)
- joined_jars_path = ';'.join(jars_path)
- if old_jar_paths and old_jar_paths.strip():
- joined_jars_path = ';'.join([old_jar_paths, joined_jars_path])
- env_config.setString(jars_key, joined_jars_path)
+ old_jars_path = env_config.getString(jars_key, None)
+ old_jars_list = Configuration.parse_list_value(
+ old_jars_path,
+
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml())
+ joined_jars_list = [*old_jars_list, *jars_path]
+ env_config.setString(jars_key, str(joined_jars_list))
def add_classpaths(self, *classpaths: str):
"""
@@ -787,10 +788,12 @@ class StreamExecutionEnvironment(object):
env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
.getEnvironmentConfig(self._j_stream_execution_environment)
old_classpaths = env_config.getString(classpaths_key, None)
- joined_classpaths = ';'.join(list(classpaths))
- if old_classpaths and old_classpaths.strip():
- joined_classpaths = ';'.join([old_classpaths, joined_classpaths])
- env_config.setString(classpaths_key, joined_classpaths)
+ old_classpaths_list = Configuration.parse_list_value(
+ old_classpaths,
+
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
+ )
+ joined_classpaths_list = [*old_classpaths_list, *classpaths]
+ env_config.setString(classpaths_key, str(joined_classpaths_list))
def get_default_local_parallelism(self) -> int:
"""
diff --git
a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index 1ac1eb95ed2..8be06db1d03 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -554,6 +554,40 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
expected.sort()
self.assertEqual(expected, result)
+ def test_add_jars_basic(self):
+ jvm = get_gateway().jvm
+ jars_key =
jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
+ env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
+ .getEnvironmentConfig(self.env._j_stream_execution_environment)
+
+ old_jars = env_config.getString(jars_key, None)
+ self.assertIsNone(old_jars)
+
+ self.env.add_jars('file://1.jar')
+ new_jars = env_config.getString(jars_key, None)
+ self.assertEqual(new_jars, '[\'file://1.jar\']')
+
+ self.env.add_jars('file://2.jar', 'file://3.jar')
+ new_jars = env_config.getString(jars_key, None)
+ self.assertEqual(new_jars, '[\'file://1.jar\', \'file://2.jar\',
\'file://3.jar\']')
+
+ def test_add_classpaths_basic(self):
+ jvm = get_gateway().jvm
+ classpaths_key =
jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
+ env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
+ .getEnvironmentConfig(self.env._j_stream_execution_environment)
+
+ old_classpaths = env_config.getString(classpaths_key, None)
+ self.assertIsNone(old_classpaths)
+
+ self.env.add_classpaths('file://1.jar')
+ new_classpaths = env_config.getString(classpaths_key, None)
+ self.assertEqual(new_classpaths, '[\'file://1.jar\']')
+
+ self.env.add_classpaths('file://2.jar', 'file://3.jar')
+ new_classpaths = env_config.getString(classpaths_key, None)
+ self.assertEqual(new_classpaths, '[\'file://1.jar\', \'file://2.jar\',
\'file://3.jar\']')
+
def test_add_jars(self):
# find kafka connector jars
flink_source_root = _find_flink_source_root()
diff --git a/flink-python/pyflink/table/table_config.py
b/flink-python/pyflink/table/table_config.py
index ba17767c776..d9869e03c2c 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -106,7 +106,10 @@ class TableConfig(object):
jars_key =
jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
classpaths_key =
jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
if key in [jars_key, classpaths_key]:
- jar_urls = Configuration.parse_jars_value(value, jvm)
+ jar_urls = Configuration.parse_list_value(
+ value,
+
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
+ )
add_jars_to_context_class_loader(jar_urls)
return self
diff --git a/flink-python/pyflink/table/table_environment.py
b/flink-python/pyflink/table/table_environment.py
index ebfcc7cb095..b36021bd003 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -1545,7 +1545,10 @@ class TableEnvironment(object):
if jar_urls:
jvm = get_gateway().jvm
jar_urls_list = []
- parsed_jar_urls = Configuration.parse_jars_value(jar_urls, jvm)
+ parsed_jar_urls = Configuration.parse_list_value(
+ jar_urls,
+
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
+ )
url_strings = [
jvm.java.net.URL(url).toString() if url else ""
for url in parsed_jar_urls
@@ -1553,9 +1556,9 @@ class TableEnvironment(object):
self._parse_urls(url_strings, jar_urls_list)
j_configuration = get_j_env_configuration(self._get_j_env())
- parsed_jar_urls = Configuration.parse_jars_value(
+ parsed_jar_urls = Configuration.parse_list_value(
j_configuration.getString(config_key, ""),
- jvm
+
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
)
self._parse_urls(parsed_jar_urls, jar_urls_list)