Dian Fu created FLINK-34985: ------------------------------- Summary: It doesn't support to access fields by name for map function in thread mode Key: FLINK-34985 URL: https://issues.apache.org/jira/browse/FLINK-34985 Project: Flink Issue Type: Bug Components: API / Python Reporter: Dian Fu
Reported in slack channel: [https://apache-flink.slack.com/archives/C065944F9M2/p1711640068929589] ``` hi all, I seem to be running into an issue when switching to thread mode in PyFlink. In an UDF the {{Row}} seems to get converted into a tuple and you cannot access fields by their name anymore. In process mode it works fine. This bug can easily be reproduced using this minimal example, which is close to the PyFlink docs: from pyflink.datastream import StreamExecutionEnvironment from pyflink.common import Row from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.udf import udf env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.get_config().set("parallelism.default", "1") # This does work: t_env.get_config().set("python.execution-mode", "process") # This doesn't work: #t_env.get_config().set("python.execution-mode", "thread") def map_function(a: Row) -> Row: return Row(a.a + 1, a.b * a.b) # map operation with a python general scalar function func = udf( map_function, result_type=DataTypes.ROW( [ DataTypes.FIELD("a", DataTypes.BIGINT()), DataTypes.FIELD("b", DataTypes.BIGINT()), ] ), ) table = ( t_env.from_elements( [(2, 4), (0, 0)], schema=DataTypes.ROW( [ DataTypes.FIELD("a", DataTypes.BIGINT()), DataTypes.FIELD("b", DataTypes.BIGINT()), ] ), ) .map(func) .alias("a", "b") .execute() .print() )``` The exception I get in this execution mode is: 2024-03-28 16:32:10 Caused by: pemja.core.PythonException: <class 'AttributeError'>: 'tuple' object has no attribute 'a' 2024-03-28 16:32:10 at /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/embedded/operation_utils.process_element(operation_utils.py:72) 2024-03-28 16:32:10 at /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/table/operations.process_element(operations.py:102) 2024-03-28 16:32:10 at <string>.<lambda>(<string>:1) 2024-03-28 16:32:10 at /opt/flink/wouter/minimal_example.map_function(minimal_example.py:19) -- This message was sent by Atlassian Jira (v8.20.10#820010)