Ok. The datagen with sequence option can produce this issue easily, and it
also resulted in an incorrect result. I have a sequence generated by
datagen that starts from 1 to 5 and let the UDF randomly either return null
or bytes. Surprisingly, not only the UDF has been executed twice but also
the where clause did not handle the `*IS NOT NULL*`. This is a big shock
from my side, the where clause `IS NOT NULL` condition is a fundamental SQL
feature and it should not break. I have updated my finding in FLINK-29855
<https://issues.apache.org/jira/browse/FLINK-29855>, and here are the repro
steps:

Query:

INSERT INTO print_table
     SELECT * FROM (
           SELECT RandomUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable
     )
AS ET WHERE ET.`id_in_bytes` *IS NOT NULL*"


Result:

+I[null, 1]
+I[[50], 2]
+I[null, 4]

UDF

public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer
intputNum) {
    byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
    int randomNumber = ((int) (Math.random() * (10 - 1))) + 1;
    LOG.info("[*][*][*] input num is {} and random number is {}.
[*][*][*]", intputNum, randomNumber);
    if (randomNumber % 2 == 0) {
      LOG.info("### ### input bytes {} and num {}.   ### ### DEBUG ###
### duplicated call??? ### DEBUG  ### ### ", results, intputNum);
      return results;
    }
    LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
    return null;
  }


Log:

2022-11-02 13:38:56,765 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num
-1324836502.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:38:56,766 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num
-1324836502.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:38:57,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num
1085456542.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:38:57,763 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num
1085456542.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:38:58,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num
1506311954.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:38:58,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num
1506311954.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:38:59,759 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num
-1800690437.
2022-11-02 13:39:00,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num
1428877483.
2022-11-02 13:39:01,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num
-1794263686.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:39:01,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num
-1794263686.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:39:02,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num
-1166898542.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:39:02,762 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num
-1166898542.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:39:03,758 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [45, 49, 54, 54, 51, 53, 49, 53, 55, 53, 51] and num
-1663515753.
2022-11-02 13:39:04,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [45, 52, 53, 53, 51, 52, 52, 50, 57] and num
-45534429.
2022-11-02 13:39:05,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [49, 50, 55, 48, 55, 50, 52, 52, 57] and num
127072449.
2022-11-02 13:39:06,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [45, 52, 53, 51, 55, 48, 53, 54, 48, 55] and num
-453705607.
2022-11-02 13:39:07,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num
-1095908326.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:39:07,763 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ###
### input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num
-1095908326.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ###
###
2022-11-02 13:39:08,760 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [45, 49, 54, 50, 55, 53, 57, 55, 52, 49, 55] and num
-1627597417.
2022-11-02 13:39:09,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [53, 57, 54, 53, 50, 48, 53, 48, 49] and num
596520501.
2022-11-02 13:39:10,761 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [49, 51, 54, 49, 49, 54, 50, 56, 52, 51] and num
1361162843.
2022-11-02 13:39:11,759 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [50, 48, 52, 56, 48, 53, 49, 55, 57, 49] and num
2048051791.
2022-11-02 13:39:12,759 INFO
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ***
*** input bytes [45, 51, 48, 54, 54, 48, 51, 56, 51, 53] and num
-306603835.


On Thu, Nov 3, 2022 at 3:04 AM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote:

> The dategen may produce rows with same values.
>
> From my side, in Flink, the udf shouldn't process one row for twice,
> otherwise, it should be a critical bug.
>
> Best regards,
> Yuxia
>
> ------------------------------
> *发件人: *"Xinyi Yan" <yanxi...@apache.org>
> *收件人: *"User" <user@flink.apache.org>
> *发送时间: *星期四, 2022年 11 月 03日 上午 6:59:20
> *主题: *Question about UDF randomly processed input row twice
>
> Hi all,
> I found a weird UDF behavior, and it's a single thread that processes UDF
> twice, see FLINK-29855 <https://issues.apache.org/jira/browse/FLINK-29855> for
> more details. Basically, I created a datagen table with a random integer (1
> row per second) and passed this value into the UDF. Inside UDF, I just
> simply mod the input number, convert the integer to a byte array, and then
> logged it for debugging purposes. As you can see, some of the rows have
> been called twice inside UDF. Not sure if this duplicated UDF call is
> expected, and not sure why it doesn't constantly produce duplicated calls
> for all rows. In any case of concern about the local env setup, I only have
> 1 task manager and 1 task slot in my local Flink cluster.
>
> Thanks!
>
> UDF
>
> public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer 
> intputNum) {
>     byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
>     if (intputNum % 2 == 0) {
>       LOG.info("### ### input bytes {} and num {}.   ### ### DEBUG ### ### 
> duplicated call??? ### DEBUG  ### ### ", results, intputNum);
>       return results;
>     }
>     LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
>     return null;
>   }
>
>
> Main class DDLs
>
> tEnv.executeSql("CREATE FUNCTION IntInputUdf AS 
> 'org.apache.flink.playgrounds.spendreport.IntInputUdf'");        
> tEnv.executeSql("CREATE TABLE datagenTable (\n" +
>                 "    id  INT\n" +
>                 ") WITH (\n" +
>                 "    'connector' = 'datagen',\n" +
>                 "    'number-of-rows' = '100',\n" +
>                 "    'rows-per-second' = '1'\n" +
>                 ")");
> tEnv.executeSql("CREATE TABLE print_table (\n" +
>                 "    id_in_bytes  VARBINARY,\n" +
>                 "    id  INT\n" +
>                 ") WITH (\n" +
>                 "    'connector' = 'print'\n" +
>                 ")");
> tEnv.executeSql("INSERT INTO print_table SELECT * FROM ( SELECT 
> IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE 
> ET.`id_in_bytes` IS NOT NULL");
>
>
> Logging
>
> 2022-11-02 13:38:58,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num *1506311954*.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ###
> 2022-11-02 13:38:58,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num *1506311954*.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ###
> 2022-11-02 13:38:59,759 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num 
> *-1800690437*.
> 2022-11-02 13:39:00,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num *1428877483*.
> 2022-11-02 13:39:01,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num 
> *-1794263686*.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ###
> 2022-11-02 13:39:01,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num 
> *-1794263686*.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ###
>
>
>

Reply via email to