Dian Fu created FLINK-17093:
-------------------------------
Summary: Python UDF doesn't work when the input column is of
composite type
Key: FLINK-17093
URL: https://issues.apache.org/jira/browse/FLINK-17093
Project: Flink
Issue Type: Bug
Components: API / Python
Affects Versions: 1.10.0
Reporter: Dian Fu
Assignee: Dian Fu
Fix For: 1.10.1, 1.11.0
For the following job:
{code}
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import BatchTableEnvironment, StreamTableEnvironment,
EnvironmentSettings, CsvTableSink
from pyflink.table.descriptors import Schema, Kafka, Json
from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf
import os
@udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING()],
result_type=DataTypes.STRING())
def get_host_ip(source, qr, sip, dip):
if source == "NGAF" and qr == '1':
return dip
return sip
@udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING()],
result_type=DataTypes.STRING())
def get_dns_server_ip(source, qr, sip, dip):
if source == "NGAF" and qr == '1':
return sip
return dip
def test_case():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
from pyflink.table import Row
table = t_env.from_elements(
[("DNS", Row(source="source", devid="devid", sip="sip", dip="dip",
qr="qr", queries="queries", answers="answers", qtypes="qtypes",
atypes="atypes", rcode="rcode", ts="ts",))],
DataTypes.ROW([DataTypes.FIELD("stype", DataTypes.STRING()),
DataTypes.FIELD("data",
DataTypes.ROW([DataTypes.FIELD('source', DataTypes.STRING()),
DataTypes.FIELD("devid", DataTypes.STRING()),
DataTypes.FIELD('sip', DataTypes.STRING()),
DataTypes.FIELD('dip', DataTypes.STRING()),
DataTypes.FIELD("qr", DataTypes.STRING()),
DataTypes.FIELD("queries", DataTypes.STRING()),
DataTypes.FIELD("answers", DataTypes.STRING()),
DataTypes.FIELD("qtypes", DataTypes.STRING()),
DataTypes.FIELD("atypes", DataTypes.STRING()),
DataTypes.FIELD("rcode", DataTypes.STRING()),
DataTypes.FIELD("ts", DataTypes.STRING())]))
]
))
result_file = "/tmp/test.csv"
if os.path.exists(result_file):
os.remove(result_file)
t_env.register_table_sink("Results",
CsvTableSink(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
'n'],
[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()],
"/tmp/test.csv"))
t_env.register_function("get_host_ip", get_host_ip)
t_env.register_function("get_dns_server_ip", get_dns_server_ip)
t_env.register_table("source", table)
standard_table = t_env.sql_query("select data.*, stype as dns_type from
source")\
.where("dns_type.in('DNSFULL', 'DNS', 'DNSFULL_FROM_LOG', 'DNS_FROM_LOG')")
t_env.register_table("standard_table", standard_table)
final_table = t_env.sql_query("SELECT *, get_host_ip(source, qr, sip, dip) as
host_ip,"
"get_dns_server_ip(source, qr, sip, dip) as dns_server_ip FROM standard_table")
final_table.insert_into("Results")
t_env.execute("test")
if __name__ == '__main__':
test_case()
{code}
The plan is as following which is not correct:
{code}
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source:
KafkaTableSource(type, data) -> Map -> where: (IN(type, _UTF-16LE'DNSFULL',
_UTF-16LE'DNS', _UTF-16LE'DNSFULL_FROM_LOG', _UTF-16LE'DNS_FROM_LOG')), select:
(data, type) -> select: (type, get_host_ip(type.source, type.qr, type.sip,
type.dip) AS f0, get_dns_server_ip(type.source, type.qr, type.sip, type.dip) AS
f1) -> select: (f0.source AS source, f0.devid AS devid, f0.sip AS sip, f0.dip
AS dip, f0.qr AS qr, f0.queries AS queries, f0.answers AS answers, f0.qtypes AS
qtypes, f0.atypes AS atypes, f0.rcode AS rcode, f0.ts AS ts, type AS dns_type,
f0 AS host_ip, f1 AS dns_server_ip) -> to: Row -> Sink: KafkaTableSink(source,
devid, sip, dip, qr, queries, answers, qtypes, atypes, rcode, ts, dns_type,
host_ip, dns_server_ip) (1/4) (8d064ab137866a2a9040392a87bcc59d) switched from
RUNNING to FAILED.
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)