(This question is cross-posted on StackOverflow
https://stackoverflow.com/questions/66756612/failed-to-unit-test-pyflink-udf
)

I am using PyFlink and I want to unit test my UDF written in Python.

To test the simple udf below:

```python
# tasks/helloworld/udf.py
from pyflink.table import DataTypes
from pyflink.table.udf import udf

@udf(input_types=[DataTypes.INT(), DataTypes.INT()],
result_type=DataTypes.BIGINT())
def add(i, j):
    return i + j
```

I create a test file that should fail:
```python
from tasks.helloworld.udf import add

def test_add():
    assert add(1,1) == 3
```

Sadly, it passes if I run `pytest`:
```
> pytest
===========================================================================================
test session starts
============================================================================================
platform darwin -- Python 3.7.10, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
rootdir: /Users/chenyisheng/source/yiksanchan/pytest-flink
collected 1 item

tests/test_helloworld.py .

                                              [100%]

=============================================================================================
warnings summary
=============================================================================================
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13

/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13:
DeprecationWarning: Using or importing the ABCs from 'collections' instead
of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will
stop working
    from collections import (

../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/udf.py:291

/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/udf.py:291:
DeprecationWarning: Using or importing the ABCs from 'collections' instead
of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will
stop working
    if not isinstance(input_types, collections.Iterable) \

-- Docs: https://docs.pytest.org/en/stable/warnings.html
======================================================================================
1 passed, 6 warnings in 0.98s
=======================================================================================
```

However, the test will fail as expected if I remove the
`@udf(input_types=[...], result_type=...)` annotation:
```
> pytest
===========================================================================================
test session starts
============================================================================================
platform darwin -- Python 3.7.10, pytest-6.2.2, py-1.10.0, pluggy-0.13.1
rootdir: /Users/chenyisheng/source/yiksanchan/pytest-flink
collected 1 item

tests/test_helloworld.py F

                                              [100%]

=================================================================================================
FAILURES
=================================================================================================
_________________________________________________________________________________________________
test_add
_________________________________________________________________________________________________

    def test_add():
>       assert add(1,1) == 3
E       assert 2 == 3
E        +  where 2 = add(1, 1)

tests/test_helloworld.py:4: AssertionError
=============================================================================================
warnings summary
=============================================================================================
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13
../../../../../usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13

/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_collections.py:13:
DeprecationWarning: Using or importing the ABCs from 'collections' instead
of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will
stop working
    from collections import (

-- Docs: https://docs.pytest.org/en/stable/warnings.html
=========================================================================================
short test summary info
==========================================================================================
FAILED tests/test_helloworld.py::test_add - assert 2 == 3
======================================================================================
1 failed, 5 warnings in 0.17s
=======================================================================================
```

The full example can be found
https://github.com/YikSanChan/how-to-pytest-flink.

Best,
Yik San

Reply via email to