Roman Lupiichuk created FLINK-33110:
---------------------------------------
Summary: Array content gets replaced with last element duplicates
Key: FLINK-33110
URL: https://issues.apache.org/jira/browse/FLINK-33110
Project: Flink
Issue Type: Bug
Affects Versions: 1.15.4
Reporter: Roman Lupiichuk
After upgrade from Flink 1.14.0 to 1.15.4 a bunch of our tests started to fail.
I've striped down one of the failing test to following (it's in Kotlin)
{code:java}
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.annotation.FunctionHint
import org.apache.flink.table.annotation.InputGroup
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.types.Row
import org.junit.jupiter.api.Test
@FunctionHint(output = DataTypeHint("ARRAY<ROW<fieldName STRING NOT NULL>>"))
object TestArrayFunc : ScalarFunction() {
fun eval(@DataTypeHint(inputGroup = InputGroup.ANY) vararg values: Any):
Array<Row> =
values
.map { data ->
val casted = data as Map<String, String?>
Row.of(casted["fieldName"])
}
.toTypedArray()
}
class ArrayFieldTest {
@Test
fun test() {
val tableEnv = TableEnvironment.create(
Configuration().also {
it.setString("table.exec.resource.default-parallelism", "1")
},
)
tableEnv.createTemporarySystemFunction("TO_FIELDS_ARRAY", TestArrayFunc)
val dataId = TestValuesTableFactory.registerData(
listOf(
TestValuesTableFactory.changelogRow(
"+I",
"123"
)
)
)
tableEnv.executeSql(
"""
CREATE TABLE events
(
id STRING
) WITH (
'connector' = 'values',
'data-id' = '$dataId'
)
"""
)
tableEnv.executeSql(
"""
CREATE TABLE results
(
fields ARRAY<ROW<fieldName STRING>>,
event_time TIMESTAMP
) WITH (
'connector' = 'print'
)
"""
)
tableEnv.executeSql(
"""
INSERT INTO results (fields, event_time)
SELECT
TO_FIELDS_ARRAY(
MAP['fieldName', 'foo'],
MAP['fieldName', 'hello']
),
NOW()
FROM events
"""
)
}
}
{code}
In Flink 1.14.0 it produces
{code:java}
+I[[+I[foo], +I[hello]], 2023-09-18T08:18:55.278]{code}
That's correct and expected output.
But in Flink 1.15.4 the output is
{code:java}
+I[[+I[hello], +I[hello]], 2023-09-18T08:21:12.569]{code}
As one can see all elements in the array were replaced with the last element.
The issue goes away if I
# either remove NOT NULL constraint from function hint
# or remove TIMESTAMP field from the sink table
There is also no issue in regular Flink cluster, only in MiniCluster which is
used in testing.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)