Liu Liu created FLINK-39532:
-------------------------------

             Summary: Python AsyncScalarFunctionOperation has race condition 
when using Cython coder
                 Key: FLINK-39532
                 URL: https://issues.apache.org/jira/browse/FLINK-39532
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 2.3.0
            Reporter: Liu Liu


Currently in AsyncScalarFunctionOperation, the process_element method submits 
the input {{value}} to an asyncio event loop and returns immediately. For 
flatten row, the input comes from FlattenRowCoderImpl.decode_from_stream. In 
the Cython implementation in coder_impl_fast.pyx, this method returns a reused 
list; by the time the async function runs, this reused list has already been 
overwritten by the next decoded row. This will corrupt the result of Python 
async functions.

This problem has been reproduced by the following SQL job using Python async 
UDFs:

{{{}@udf(input_types=[DataTypes.BIGINT()], 
result_type=DataTypes.BIGINT()){}}}{{{}async def async_add_one(i):{}}}{{    
await asyncio.sleep(0.001)}}{{    return i + 1}}

{{{}@udf(input_types=[DataTypes.BIGINT()], 
result_type=DataTypes.STRING()){}}}{{{}async def async_wrap(i):{}}}{{    await 
asyncio.sleep(0.001)}}{{    return f"v=\{i}"}}

{{{}INSERT INTO sink{}}}{{{}SELECT{}}}{{{}a AS a,{}}}{{{}async_add_one(a) AS 
plus_one,{}}}{{{}async_wrap(async_add_one(a)) AS wrapped{}}}{{{}FROM source;{}}}

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to