[ 
https://issues.apache.org/jira/browse/FLINK-25227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17459704#comment-17459704
 ] 

Francesco Guardiani commented on FLINK-25227:
---------------------------------------------

[~TsReaper] but then I suggest we focus on hardening the code generator fixing 
these functions that generate boxed types, more than introducing new functions 
that generate boxed types that we'll need to fix later. Can you come up with a 
list of such functions?

For previous versions I agree to just push the fix you proposed in 
https://github.com/apache/flink/pull/18076.

> Comparing the equality of the same (boxed) numeric values returns false
> -----------------------------------------------------------------------
>
>                 Key: FLINK-25227
>                 URL: https://issues.apache.org/jira/browse/FLINK-25227
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.14.0, 1.12.5, 1.13.3
>            Reporter: Caizhi Weng
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.15.0, 1.13.6, 1.14.3
>
>
> Add the following test case to {{TableEnvironmentITCase}} to reproduce this 
> bug.
> {code:scala}
> @Test
> def myTest(): Unit = {
>   val data = Seq(
>     Row.of(
>       java.lang.Integer.valueOf(1000),
>       java.lang.Integer.valueOf(2000),
>       java.lang.Integer.valueOf(1000),
>       java.lang.Integer.valueOf(2000))
>   )
>   tEnv.executeSql(
>     s"""
>        |create table T (
>        |  a int,
>        |  b int,
>        |  c int,
>        |  d int
>        |) with (
>        |  'connector' = 'values',
>        |  'bounded' = 'true',
>        |  'data-id' = '${TestValuesTableFactory.registerData(data)}'
>        |)
>        |""".stripMargin)
>   tEnv.executeSql("select greatest(a, b) = greatest(c, d) from T").print()
> }
> {code}
> The result is false, which is obviously incorrect.
> This is caused by the generated java code:
> {code:java}
> public class StreamExecCalc$8 extends 
> org.apache.flink.table.runtime.operators.TableStreamOperator
>         implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator {
>     private final Object[] references;
>     org.apache.flink.table.data.BoxedWrapperRowData out =
>             new org.apache.flink.table.data.BoxedWrapperRowData(1);
>     private final 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement =
>             new 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
>     public StreamExecCalc$8(
>             Object[] references,
>             org.apache.flink.streaming.runtime.tasks.StreamTask task,
>             org.apache.flink.streaming.api.graph.StreamConfig config,
>             org.apache.flink.streaming.api.operators.Output output,
>             org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
> processingTimeService)
>             throws Exception {
>         this.references = references;
>         this.setup(task, config, output);
>         if (this instanceof 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
>             
> ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
>                     .setProcessingTimeService(processingTimeService);
>         }
>     }
>     @Override
>     public void open() throws Exception {
>         super.open();
>     }
>     @Override
>     public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element)
>             throws Exception {
>         org.apache.flink.table.data.RowData in1 =
>                 (org.apache.flink.table.data.RowData) element.getValue();
>         int field$0;
>         boolean isNull$0;
>         int field$1;
>         boolean isNull$1;
>         int field$3;
>         boolean isNull$3;
>         int field$4;
>         boolean isNull$4;
>         boolean isNull$6;
>         boolean result$7;
>         isNull$3 = in1.isNullAt(2);
>         field$3 = -1;
>         if (!isNull$3) {
>             field$3 = in1.getInt(2);
>         }
>         isNull$0 = in1.isNullAt(0);
>         field$0 = -1;
>         if (!isNull$0) {
>             field$0 = in1.getInt(0);
>         }
>         isNull$1 = in1.isNullAt(1);
>         field$1 = -1;
>         if (!isNull$1) {
>             field$1 = in1.getInt(1);
>         }
>         isNull$4 = in1.isNullAt(3);
>         field$4 = -1;
>         if (!isNull$4) {
>             field$4 = in1.getInt(3);
>         }
>         out.setRowKind(in1.getRowKind());
>         java.lang.Integer result$2 = field$0;
>         boolean nullTerm$2 = false;
>         if (!nullTerm$2) {
>             java.lang.Integer cur$2 = field$0;
>             if (isNull$0) {
>                 nullTerm$2 = true;
>             } else {
>                 int compareResult = result$2.compareTo(cur$2);
>                 if ((true && compareResult < 0) || (compareResult > 0 && 
> !true)) {
>                     result$2 = cur$2;
>                 }
>             }
>         }
>         if (!nullTerm$2) {
>             java.lang.Integer cur$2 = field$1;
>             if (isNull$1) {
>                 nullTerm$2 = true;
>             } else {
>                 int compareResult = result$2.compareTo(cur$2);
>                 if ((true && compareResult < 0) || (compareResult > 0 && 
> !true)) {
>                     result$2 = cur$2;
>                 }
>             }
>         }
>         if (nullTerm$2) {
>             result$2 = null;
>         }
>         java.lang.Integer result$5 = field$3;
>         boolean nullTerm$5 = false;
>         if (!nullTerm$5) {
>             java.lang.Integer cur$5 = field$3;
>             if (isNull$3) {
>                 nullTerm$5 = true;
>             } else {
>                 int compareResult = result$5.compareTo(cur$5);
>                 if ((true && compareResult < 0) || (compareResult > 0 && 
> !true)) {
>                     result$5 = cur$5;
>                 }
>             }
>         }
>         if (!nullTerm$5) {
>             java.lang.Integer cur$5 = field$4;
>             if (isNull$4) {
>                 nullTerm$5 = true;
>             } else {
>                 int compareResult = result$5.compareTo(cur$5);
>                 if ((true && compareResult < 0) || (compareResult > 0 && 
> !true)) {
>                     result$5 = cur$5;
>                 }
>             }
>         }
>         if (nullTerm$5) {
>             result$5 = null;
>         }
>         isNull$6 = nullTerm$2 || nullTerm$5;
>         result$7 = false;
>         if (!isNull$6) {
>             result$7 = result$2 == result$5;
>         }
>         if (isNull$6) {
>             out.setNullAt(0);
>         } else {
>             out.setBoolean(0, result$7);
>         }
>         output.collect(outElement.replace(out));
>     }
>     @Override
>     public void close() throws Exception {
>         super.close();
>     }
> }
> {code}
> You can see that line 137 compares two boxed Integer types with {{==}} 
> instead of {{.equals}}, which causes this problem.
> In older Flink versions where the return types of {{cast}} functions are also 
> boxed types, casting strings to numeric values are also affected by this bug.
> Currently for a quick fix we can rewrite the generated code. But for a long 
> term solution we shouldn't use boxed types as internal data structures.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to