Caizhi Weng created FLINK-25227:
-----------------------------------
Summary: Comparing the equality of the same (boxed) numeric values
returns false
Key: FLINK-25227
URL: https://issues.apache.org/jira/browse/FLINK-25227
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.13.3, 1.12.5, 1.14.0
Reporter: Caizhi Weng
Fix For: 1.15.0, 1.14.1, 1.13.4
Add the following test case to {{TableEnvironmentITCase}} to reproduce this bug.
{code:scala}
@Test
def myTest(): Unit = {
val data = Seq(
Row.of(
java.lang.Integer.valueOf(1000),
java.lang.Integer.valueOf(2000),
java.lang.Integer.valueOf(1000),
java.lang.Integer.valueOf(2000))
)
tEnv.executeSql(
s"""
|create table T (
| a int,
| b int,
| c int,
| d int
|) with (
| 'connector' = 'values',
| 'bounded' = 'true',
| 'data-id' = '${TestValuesTableFactory.registerData(data)}'
|)
|""".stripMargin)
tEnv.executeSql("select greatest(a, b) = greatest(c, d) from T").print()
}
{code}
The result is false, which is obviously incorrect.
This is caused by the generated java code:
{code:java}
public class StreamExecCalc$8 extends
org.apache.flink.table.runtime.operators.TableStreamOperator
implements
org.apache.flink.streaming.api.operators.OneInputStreamOperator {
private final Object[] references;
org.apache.flink.table.data.BoxedWrapperRowData out =
new org.apache.flink.table.data.BoxedWrapperRowData(1);
private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord
outElement =
new
org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
public StreamExecCalc$8(
Object[] references,
org.apache.flink.streaming.runtime.tasks.StreamTask task,
org.apache.flink.streaming.api.graph.StreamConfig config,
org.apache.flink.streaming.api.operators.Output output,
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService
processingTimeService)
throws Exception {
this.references = references;
this.setup(task, config, output);
if (this instanceof
org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
((org.apache.flink.streaming.api.operators.AbstractStreamOperator)
this)
.setProcessingTimeService(processingTimeService);
}
}
@Override
public void open() throws Exception {
super.open();
}
@Override
public void
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
element)
throws Exception {
org.apache.flink.table.data.RowData in1 =
(org.apache.flink.table.data.RowData) element.getValue();
int field$0;
boolean isNull$0;
int field$1;
boolean isNull$1;
int field$3;
boolean isNull$3;
int field$4;
boolean isNull$4;
boolean isNull$6;
boolean result$7;
isNull$3 = in1.isNullAt(2);
field$3 = -1;
if (!isNull$3) {
field$3 = in1.getInt(2);
}
isNull$0 = in1.isNullAt(0);
field$0 = -1;
if (!isNull$0) {
field$0 = in1.getInt(0);
}
isNull$1 = in1.isNullAt(1);
field$1 = -1;
if (!isNull$1) {
field$1 = in1.getInt(1);
}
isNull$4 = in1.isNullAt(3);
field$4 = -1;
if (!isNull$4) {
field$4 = in1.getInt(3);
}
out.setRowKind(in1.getRowKind());
java.lang.Integer result$2 = field$0;
boolean nullTerm$2 = false;
if (!nullTerm$2) {
java.lang.Integer cur$2 = field$0;
if (isNull$0) {
nullTerm$2 = true;
} else {
int compareResult = result$2.compareTo(cur$2);
if ((true && compareResult < 0) || (compareResult > 0 &&
!true)) {
result$2 = cur$2;
}
}
}
if (!nullTerm$2) {
java.lang.Integer cur$2 = field$1;
if (isNull$1) {
nullTerm$2 = true;
} else {
int compareResult = result$2.compareTo(cur$2);
if ((true && compareResult < 0) || (compareResult > 0 &&
!true)) {
result$2 = cur$2;
}
}
}
if (nullTerm$2) {
result$2 = null;
}
java.lang.Integer result$5 = field$3;
boolean nullTerm$5 = false;
if (!nullTerm$5) {
java.lang.Integer cur$5 = field$3;
if (isNull$3) {
nullTerm$5 = true;
} else {
int compareResult = result$5.compareTo(cur$5);
if ((true && compareResult < 0) || (compareResult > 0 &&
!true)) {
result$5 = cur$5;
}
}
}
if (!nullTerm$5) {
java.lang.Integer cur$5 = field$4;
if (isNull$4) {
nullTerm$5 = true;
} else {
int compareResult = result$5.compareTo(cur$5);
if ((true && compareResult < 0) || (compareResult > 0 &&
!true)) {
result$5 = cur$5;
}
}
}
if (nullTerm$5) {
result$5 = null;
}
isNull$6 = nullTerm$2 || nullTerm$5;
result$7 = false;
if (!isNull$6) {
result$7 = result$2 == result$5;
}
if (isNull$6) {
out.setNullAt(0);
} else {
out.setBoolean(0, result$7);
}
output.collect(outElement.replace(out));
}
@Override
public void close() throws Exception {
super.close();
}
}
{code}
You can see that line 137 compares two boxed Integer types with {{==}} instead
of {{.equals}}, which causes this problem.
In older Flink versions where the return types of {{cast}} functions are also
boxed types, casting strings to numeric values are also affected by this bug.
Currently for a quick fix we can rewrite the generated code. But for a long
term solution we shouldn't use boxed types as internal data structures.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)