[ 
https://issues.apache.org/jira/browse/FLINK-29855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17629175#comment-17629175
 ] 

Xinyi Yan commented on FLINK-29855:
-----------------------------------

Thank you to [~luoyuxia] for confirming the root cause of the issue, and here 
is the finding from the email thread.
{code:java}
The execute plan for the sql `INSERT INTO print_table SELECT * FROM ( SELECT 
RandomUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE 
ET.`id_in_bytes` IS NOT NULL`  is :
`
StreamPhysicalSink(table=[default_catalog.default_database.print_table], 
fields=[id_in_bytes, id])
  StreamPhysicalCalc(select=[RandomUdf(id) AS id_in_bytes, id], where=[IS NOT 
NULL(RandomUdf(id))])
    StreamPhysicalTableSourceScan(table=[[default_catalog, default_database, 
datagenTable]], fields=[id])
`
and from the plan, we can see it'll call the udf for twice in the 
StreamPhysicalCalc, as of result of which, it seems the one row will be 
processed for twice. {code}

> UDF randomly processed input data twice 
> ----------------------------------------
>
>                 Key: FLINK-29855
>                 URL: https://issues.apache.org/jira/browse/FLINK-29855
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.14.4
>            Reporter: Xinyi Yan
>            Priority: Critical
>         Attachments: IntInputUdf.java, SpendReport.java, example.log
>
>
> Local flink cluster env: 
> 1 task manager and 1 task slot.
> To reproduce the issue:
>  # create a datagen table with a single column int type of id with 1 row per 
> second.
>  # create a UDF that only mod input data with logging statements.
>  # create a print table that prints the results.
>  # insert data into the print table with UDF(input id column) execution from 
> the datagen table.
> The logging shows that some of the data have been processed twice, which is 
> not expected I guess? This will totally change the behavior of the UDF if the 
> data has been processed twice. I also attached main and UDF classes, as well 
> as the logging file for additional info.
>  
> DDL
>  
> {code:java}
> public static void main(String[] args) throws Exception {
>         EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().build();
>         
>         TableEnvironment tEnv = TableEnvironment.create(settings);
>         
>         tEnv.executeSql("CREATE FUNCTION IntInputUdf AS 
> 'org.apache.flink.playgrounds.spendreport.IntInputUdf'");        
> tEnv.executeSql("CREATE TABLE datagenTable (\n" +
>                 "    id  INT\n" +
>                 ") WITH (\n" +
>                 "    'connector' = 'datagen',\n" +
>                 "    'number-of-rows' = '100',\n" +
>                 "    'rows-per-second' = '1'\n" +
>                 ")");        
> tEnv.executeSql("CREATE TABLE print_table (\n" +
>                 "    id_in_bytes  VARBINARY,\n" +
>                 "    id  INT\n" +
>                 ") WITH (\n" +
>                 "    'connector' = 'print'\n" +
>                 ")");        
> tEnv.executeSql("INSERT INTO print_table SELECT * FROM ( SELECT 
> IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE 
> ET.`id_in_bytes` IS NOT NULL");
>     }  {code}
>  
> UDF
>  
> {code:java}
> public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer 
> intputNum) {
>     byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
>     if (intputNum % 2 == 0) {
>       LOG.info("### ### input bytes {} and num {}.   ### ### DEBUG ### ### 
> duplicated call??? ### DEBUG  ### ### ", results, intputNum);
>       return results;
>     }
>     LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
>     return null;
>   } {code}
> output
>  
>  
> {code:java}
> 2022-11-02 13:38:56,765 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:56,766 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:57,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:57,763 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:58,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:58,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:59,759 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437.
> 2022-11-02 13:39:00,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483.
> 2022-11-02 13:39:01,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:01,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:02,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:02,762 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:03,758 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 49, 54, 54, 51, 53, 49, 53, 55, 53, 51] and num -1663515753.
> 2022-11-02 13:39:04,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 52, 53, 53, 51, 52, 52, 50, 57] and num -45534429.
> 2022-11-02 13:39:05,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [49, 50, 55, 48, 55, 50, 52, 52, 57] and num 127072449.
> 2022-11-02 13:39:06,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 52, 53, 51, 55, 48, 53, 54, 48, 55] and num -453705607.
> 2022-11-02 13:39:07,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:07,763 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:08,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 49, 54, 50, 55, 53, 57, 55, 52, 49, 55] and num -1627597417.
> 2022-11-02 13:39:09,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [53, 57, 54, 53, 50, 48, 53, 48, 49] and num 596520501.
> 2022-11-02 13:39:10,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [49, 51, 54, 49, 49, 54, 50, 56, 52, 51] and num 1361162843.
> 2022-11-02 13:39:11,759 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [50, 48, 52, 56, 48, 53, 49, 55, 57, 49] and num 2048051791.
> 2022-11-02 13:39:12,759 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 51, 48, 54, 54, 48, 51, 56, 51, 53] and num -306603835. 
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to