Liu Liu created FLINK-39532:
-------------------------------
Summary: Python AsyncScalarFunctionOperation has race condition
when using Cython coder
Key: FLINK-39532
URL: https://issues.apache.org/jira/browse/FLINK-39532
Project: Flink
Issue Type: Bug
Components: API / Python
Affects Versions: 2.3.0
Reporter: Liu Liu
Currently in AsyncScalarFunctionOperation, the process_element method submits
the input {{value}} to an asyncio event loop and returns immediately. For
flatten row, the input comes from FlattenRowCoderImpl.decode_from_stream. In
the Cython implementation in coder_impl_fast.pyx, this method returns a reused
list; by the time the async function runs, this reused list has already been
overwritten by the next decoded row. This will corrupt the result of Python
async functions.
This problem has been reproduced by the following SQL job using Python async
UDFs:
{{{}@udf(input_types=[DataTypes.BIGINT()],
result_type=DataTypes.BIGINT()){}}}{{{}async def async_add_one(i):{}}}{{
await asyncio.sleep(0.001)}}{{ return i + 1}}
{{{}@udf(input_types=[DataTypes.BIGINT()],
result_type=DataTypes.STRING()){}}}{{{}async def async_wrap(i):{}}}{{ await
asyncio.sleep(0.001)}}{{ return f"v=\{i}"}}
{{{}INSERT INTO sink{}}}{{{}SELECT{}}}{{{}a AS a,{}}}{{{}async_add_one(a) AS
plus_one,{}}}{{{}async_wrap(async_add_one(a)) AS wrapped{}}}{{{}FROM source;{}}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)