Ok. The datagen with sequence option can produce this issue easily, and it also resulted in an incorrect result. I have a sequence generated by datagen that starts from 1 to 5 and let the UDF randomly either return null or bytes. Surprisingly, not only the UDF has been executed twice but also the where clause did not handle the `*IS NOT NULL*`. This is a big shock from my side, the where clause `IS NOT NULL` condition is a fundamental SQL feature and it should not break. I have updated my finding in FLINK-29855 <https://issues.apache.org/jira/browse/FLINK-29855>, and here are the repro steps:
Query: INSERT INTO print_table SELECT * FROM ( SELECT RandomUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE ET.`id_in_bytes` *IS NOT NULL*" Result: +I[null, 1] +I[[50], 2] +I[null, 4] UDF public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer intputNum) { byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8); int randomNumber = ((int) (Math.random() * (10 - 1))) + 1; LOG.info("[*][*][*] input num is {} and random number is {}. [*][*][*]", intputNum, randomNumber); if (randomNumber % 2 == 0) { LOG.info("### ### input bytes {} and num {}. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### ", results, intputNum); return results; } LOG.info("*** *** input bytes {} and num {}.", results, intputNum); return null; } Log: 2022-11-02 13:38:56,765 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:56,766 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:57,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:57,763 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:58,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:58,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:38:59,759 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437. 2022-11-02 13:39:00,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483. 2022-11-02 13:39:01,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:39:01,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:39:02,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:39:02,762 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:39:03,758 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 49, 54, 54, 51, 53, 49, 53, 55, 53, 51] and num -1663515753. 2022-11-02 13:39:04,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 52, 53, 53, 51, 52, 52, 50, 57] and num -45534429. 2022-11-02 13:39:05,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [49, 50, 55, 48, 55, 50, 52, 52, 57] and num 127072449. 2022-11-02 13:39:06,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 52, 53, 51, 55, 48, 53, 54, 48, 55] and num -453705607. 2022-11-02 13:39:07,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:39:07,763 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### 2022-11-02 13:39:08,760 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 49, 54, 50, 55, 53, 57, 55, 52, 49, 55] and num -1627597417. 2022-11-02 13:39:09,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [53, 57, 54, 53, 50, 48, 53, 48, 49] and num 596520501. 2022-11-02 13:39:10,761 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [49, 51, 54, 49, 49, 54, 50, 56, 52, 51] and num 1361162843. 2022-11-02 13:39:11,759 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [50, 48, 52, 56, 48, 53, 49, 55, 57, 49] and num 2048051791. 2022-11-02 13:39:12,759 INFO org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input bytes [45, 51, 48, 54, 54, 48, 51, 56, 51, 53] and num -306603835. On Thu, Nov 3, 2022 at 3:04 AM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote: > The dategen may produce rows with same values. > > From my side, in Flink, the udf shouldn't process one row for twice, > otherwise, it should be a critical bug. > > Best regards, > Yuxia > > ------------------------------ > *发件人: *"Xinyi Yan" <yanxi...@apache.org> > *收件人: *"User" <user@flink.apache.org> > *发送时间: *星期四, 2022年 11 月 03日 上午 6:59:20 > *主题: *Question about UDF randomly processed input row twice > > Hi all, > I found a weird UDF behavior, and it's a single thread that processes UDF > twice, see FLINK-29855 <https://issues.apache.org/jira/browse/FLINK-29855> for > more details. Basically, I created a datagen table with a random integer (1 > row per second) and passed this value into the UDF. Inside UDF, I just > simply mod the input number, convert the integer to a byte array, and then > logged it for debugging purposes. As you can see, some of the rows have > been called twice inside UDF. Not sure if this duplicated UDF call is > expected, and not sure why it doesn't constantly produce duplicated calls > for all rows. In any case of concern about the local env setup, I only have > 1 task manager and 1 task slot in my local Flink cluster. > > Thanks! > > UDF > > public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer > intputNum) { > byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8); > if (intputNum % 2 == 0) { > LOG.info("### ### input bytes {} and num {}. ### ### DEBUG ### ### > duplicated call??? ### DEBUG ### ### ", results, intputNum); > return results; > } > LOG.info("*** *** input bytes {} and num {}.", results, intputNum); > return null; > } > > > Main class DDLs > > tEnv.executeSql("CREATE FUNCTION IntInputUdf AS > 'org.apache.flink.playgrounds.spendreport.IntInputUdf'"); > tEnv.executeSql("CREATE TABLE datagenTable (\n" + > " id INT\n" + > ") WITH (\n" + > " 'connector' = 'datagen',\n" + > " 'number-of-rows' = '100',\n" + > " 'rows-per-second' = '1'\n" + > ")"); > tEnv.executeSql("CREATE TABLE print_table (\n" + > " id_in_bytes VARBINARY,\n" + > " id INT\n" + > ") WITH (\n" + > " 'connector' = 'print'\n" + > ")"); > tEnv.executeSql("INSERT INTO print_table SELECT * FROM ( SELECT > IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE > ET.`id_in_bytes` IS NOT NULL"); > > > Logging > > 2022-11-02 13:38:58,760 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### > input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num *1506311954*. > ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### > 2022-11-02 13:38:58,761 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### > input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num *1506311954*. > ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### > 2022-11-02 13:38:59,759 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** > input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num > *-1800690437*. > 2022-11-02 13:39:00,761 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** > input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num *1428877483*. > 2022-11-02 13:39:01,761 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### > input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num > *-1794263686*. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### > 2022-11-02 13:39:01,761 INFO > org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### > input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num > *-1794263686*. ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ### > > >