Roman Lupiichuk created FLINK-33110:
---------------------------------------

             Summary: Array content gets replaced with last element duplicates
                 Key: FLINK-33110
                 URL: https://issues.apache.org/jira/browse/FLINK-33110
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.15.4
            Reporter: Roman Lupiichuk


After upgrade from Flink 1.14.0 to 1.15.4 a bunch of our tests started to fail.

I've striped down one of the failing test to following (it's in Kotlin)
{code:java}
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.annotation.FunctionHint
import org.apache.flink.table.annotation.InputGroup
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.types.Row
import org.junit.jupiter.api.Test

@FunctionHint(output = DataTypeHint("ARRAY<ROW<fieldName STRING NOT NULL>>"))
object TestArrayFunc : ScalarFunction() {
    fun eval(@DataTypeHint(inputGroup = InputGroup.ANY) vararg values: Any): 
Array<Row> =
        values
            .map { data ->
                val casted = data as Map<String, String?>
                Row.of(casted["fieldName"])
            }
            .toTypedArray()
}

class ArrayFieldTest {
    @Test
    fun test() {
        val tableEnv = TableEnvironment.create(
            Configuration().also {
                it.setString("table.exec.resource.default-parallelism", "1")
            },
        )
        tableEnv.createTemporarySystemFunction("TO_FIELDS_ARRAY", TestArrayFunc)

        val dataId = TestValuesTableFactory.registerData(
            listOf(
                TestValuesTableFactory.changelogRow(
                    "+I",
                    "123"
                )
            )
        )
        tableEnv.executeSql(
            """
                CREATE TABLE events
                (
                    id STRING
                ) WITH (
                    'connector' = 'values',
                    'data-id' = '$dataId'
                )
            """
        )
        tableEnv.executeSql(
            """
                CREATE TABLE results
                (
                    fields ARRAY<ROW<fieldName STRING>>,
                    event_time TIMESTAMP
                ) WITH (
                    'connector' = 'print'
                )
            """
        )

        tableEnv.executeSql(
            """
                INSERT INTO results (fields, event_time)
                SELECT
                    TO_FIELDS_ARRAY(
                       MAP['fieldName', 'foo'],
                       MAP['fieldName', 'hello']
                    ),
                    NOW()
                FROM events
            """
        )
    }
}
 {code}
In Flink 1.14.0 it produces
{code:java}
+I[[+I[foo], +I[hello]], 2023-09-18T08:18:55.278]{code}
That's correct and expected output.

But in Flink 1.15.4 the output is
{code:java}
+I[[+I[hello], +I[hello]], 2023-09-18T08:21:12.569]{code}
As one can see all elements in the array were replaced with the last element.

The issue goes away if I
 # either remove NOT NULL constraint from function hint
 # or remove TIMESTAMP field from the sink table

There is also no issue in regular Flink cluster, only in MiniCluster which is 
used in testing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to