我定义了一个kafka来源的table,sql查询时调了自定义函数, 但是发现参数不能被正确传递给自定义函数eval. 我用的flink版本是1.10.0.
l json 的ddl如下: private static final String personKafkaTable = "CREATE TABLE hw_person_normal_t(\n" + " data ARRAY<ROW<byteSize STRING,columnName STRING,rawData STRING,type STRING>>,\n" + " key STRING,\n" + " operation STRING\n" + ") with (\n" + "'connector.type' = 'kafka', \n" + "'connector.version' = 'universal',\n" + "'connector.topic' = 'HR_SALARY_FLINK_TEST',\n" + "'connector.properties.zookeeper.connect' = 'xxx',\n" + "'connector.properties.bootstrap.servers' = 'xxx',\n" + " 'connector.properties.group.id' = 'salaryGroup',\n" + " 'format.type' = 'json'\n" + ")"; l sql查询中调用了自定义函数如下: Table tempTable = tEnv.sqlQuery("select data from hw_person_normal_t") .joinLateral("ParserJsonFunc(data) as (personNormalId, uuId, lastOrgId, lastDepartmentCode, operationType)") .select("personNormalId, uuId, lastOrgId, lastDepartmentCode, operationType"); l 调试时发现自定义函数 eval 传递过来的value参数有7条,但是每条数据的都是空。 自定义function函数如下: public class ParserJsonPersonNormalFunc extends TableFunction<Row> { private static final Logger log = LoggerFactory.getLogger(ParserJsonPersonNormalFunc.class); public void eval(Row[] value) { try { log.info("eval start"); collector.collect(Row.of(value)); } catch (Exception e) { log.error("parser json failed :", e); } } @Override public TypeInformation<Row> getResultType() { return Types.ROW(Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING); } } 代码里注册了function: tEnv.sqlUpdate(personKafkaTable); tEnv.registerFunction("ParserJsonFunc", new ParserJsonPersonNormalFunc()); 消息体格式如下: { "beforeData": [], "byteSize": 272, "columnNumber": 32, "data": [{ "byteSize": 8, "columnName": "APPLY_PERSON_ID", "rawData": 10017, "type": "LONG" }, { "byteSize": 12, "columnName": "UPDATE_SALARY", "rawData": "11000.000000", "type": "DOUBLE" }, { "byteSize": 11, "columnName": "UP_AMOUNT", "rawData": "1000.000000", "type": "DOUBLE" }, { "byteSize": 3, "columnName": "CURRENCY", "rawData": "CNY", "type": "STRING" }, { "byteSize": 32, "columnName": "EXCHANGE_RATE", "rawData": "1.000000000000000000000000000000", "type": "DOUBLE" }, { "byteSize": 11, "columnName": "DEDUCTED_ACCOUNT", "rawData": "1000.000000", "type": "DOUBLE" }, { "byteSize": 1, "columnName": "ENTER_AT_PROCESS", "rawData": "Y", "type": "STRING" }], "dataCount": 0, "dataMetaData": { "connector": "mysql", "pos": 1000368076, "row": 0, "ts_ms": 1625565737000, "snapshot": "false", "db": "testdb", "table": "flow_person_t" }, "key": "APPLY_PERSON_ID", "memorySize": 1120, "operation": "insert", "rowIndex": -1, "timestamp": "1970-01-01 00:00:00" }