Dear readers,
I'm running into some unexpected behaviour in PyFlink when switching
execution mode from process to thread. In thread mode, my `Row` gets
converted to a tuple whenever I use a UDF in a map operation. By this
conversion to tuples, we lose critical information such as column names.
Below is a minimal working example (mostly taken from the documentation):
```
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Row
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
t_env.get_config().set("parallelism.default", "1")
# This does work:
t_env.get_config().set("python.execution-mode", "process")
# This doesn't work:
#t_env.get_config().set("python.execution-mode", "thread")
def map_function(a: Row) -> Row:
return Row(a.a + 1, a.b * a.b)
# map operation with a python general scalar function
func = udf(
map_function,
result_type=DataTypes.ROW(
[
DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT()),
]
),
)
table = (
t_env.from_elements(
[(2, 4), (0, 0)],
schema=DataTypes.ROW(
[
DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT()),
]
),
)
.map(func)
.alias("a", "b")
.execute()
.print()
)
```
This results in the following exception:
2024-03-28 16:32:10 Caused by: pemja.core.PythonException: <class
'AttributeError'>: 'tuple' object has no attribute 'a' 2024-03-28 16:32:10
at
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/embedded/operation_utils.process_element(operation_utils.py:72)
2024-03-28 16:32:10 at
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/table/operations.process_element(operations.py:102)
2024-03-28 16:32:10 at <string>.<lambda>(<string>:1) 2024-03-28 16:32:10 at
/opt/flink/wouter/minimal_example.map_function(minimal_example.py:19)
Note that in process mode this works perfectly fine. Is this expected
behaviour and/or is there a workaround?
Kind regards,
Wouter