[ 
https://issues.apache.org/jira/browse/FLINK-29855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17628431#comment-17628431
 ] 

Xinyi Yan commented on FLINK-29855:
-----------------------------------

[~lsy], thanks for the reply. I don't think the datagen produces the duplicated 
rows that often, especially 1 row per second already defined as part of the 
datagen config.

In addition to [~rovo98] sequence experiment, it's a clear signal that UDF has 
been randomly processed twice.  

> UDF randomly processed input data twice 
> ----------------------------------------
>
>                 Key: FLINK-29855
>                 URL: https://issues.apache.org/jira/browse/FLINK-29855
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.14.4
>            Reporter: Xinyi Yan
>            Priority: Critical
>         Attachments: IntInputUdf.java, SpendReport.java, example.log
>
>
> Local flink cluster env: 
> 1 task manager and 1 task slot.
> To reproduce the issue:
>  # create a datagen table with a single column int type of id with 1 row per 
> second.
>  # create a UDF that only mod input data with logging statements.
>  # create a print table that prints the results.
>  # insert data into the print table with UDF(input id column) execution from 
> the datagen table.
> The logging shows that some of the data have been processed twice, which is 
> not expected I guess? This will totally change the behavior of the UDF if the 
> data has been processed twice. I also attached main and UDF classes, as well 
> as the logging file for additional info.
>  
> DDL
>  
> {code:java}
> public static void main(String[] args) throws Exception {
>         EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().build();
>         
>         TableEnvironment tEnv = TableEnvironment.create(settings);
>         
>         tEnv.executeSql("CREATE FUNCTION IntInputUdf AS 
> 'org.apache.flink.playgrounds.spendreport.IntInputUdf'");        
> tEnv.executeSql("CREATE TABLE datagenTable (\n" +
>                 "    id  INT\n" +
>                 ") WITH (\n" +
>                 "    'connector' = 'datagen',\n" +
>                 "    'number-of-rows' = '100',\n" +
>                 "    'rows-per-second' = '1'\n" +
>                 ")");        
> tEnv.executeSql("CREATE TABLE print_table (\n" +
>                 "    id_in_bytes  VARBINARY,\n" +
>                 "    id  INT\n" +
>                 ") WITH (\n" +
>                 "    'connector' = 'print'\n" +
>                 ")");        
> tEnv.executeSql("INSERT INTO print_table SELECT * FROM ( SELECT 
> IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE 
> ET.`id_in_bytes` IS NOT NULL");
>     }  {code}
>  
> UDF
>  
> {code:java}
> public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer 
> intputNum) {
>     byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
>     if (intputNum % 2 == 0) {
>       LOG.info("### ### input bytes {} and num {}.   ### ### DEBUG ### ### 
> duplicated call??? ### DEBUG  ### ### ", results, intputNum);
>       return results;
>     }
>     LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
>     return null;
>   } {code}
> output
>  
>  
> {code:java}
> 2022-11-02 13:38:56,765 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:56,766 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:57,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:57,763 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:58,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:58,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:59,759 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437.
> 2022-11-02 13:39:00,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483.
> 2022-11-02 13:39:01,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:01,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:02,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:02,762 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:03,758 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 49, 54, 54, 51, 53, 49, 53, 55, 53, 51] and num -1663515753.
> 2022-11-02 13:39:04,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 52, 53, 53, 51, 52, 52, 50, 57] and num -45534429.
> 2022-11-02 13:39:05,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [49, 50, 55, 48, 55, 50, 52, 52, 57] and num 127072449.
> 2022-11-02 13:39:06,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 52, 53, 51, 55, 48, 53, 54, 48, 55] and num -453705607.
> 2022-11-02 13:39:07,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:07,763 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:08,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 49, 54, 50, 55, 53, 57, 55, 52, 49, 55] and num -1627597417.
> 2022-11-02 13:39:09,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [53, 57, 54, 53, 50, 48, 53, 48, 49] and num 596520501.
> 2022-11-02 13:39:10,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [49, 51, 54, 49, 49, 54, 50, 56, 52, 51] and num 1361162843.
> 2022-11-02 13:39:11,759 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [50, 48, 52, 56, 48, 53, 49, 55, 57, 49] and num 2048051791.
> 2022-11-02 13:39:12,759 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 51, 48, 54, 54, 48, 51, 56, 51, 53] and num -306603835. 
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to