Dian Fu created FLINK-17093:
-------------------------------

             Summary: Python UDF doesn't work when the input column is of 
composite type
                 Key: FLINK-17093
                 URL: https://issues.apache.org/jira/browse/FLINK-17093
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 1.10.0
            Reporter: Dian Fu
            Assignee: Dian Fu
             Fix For: 1.10.1, 1.11.0


For the following job:
{code}

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import BatchTableEnvironment, StreamTableEnvironment, 
EnvironmentSettings, CsvTableSink
from pyflink.table.descriptors import Schema, Kafka, Json
from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf
import os

@udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
 DataTypes.STRING()],
 result_type=DataTypes.STRING())
def get_host_ip(source, qr, sip, dip):
    if source == "NGAF" and qr == '1':
        return dip
    return sip

@udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
 DataTypes.STRING()],
 result_type=DataTypes.STRING())
def get_dns_server_ip(source, qr, sip, dip):
    if source == "NGAF" and qr == '1':
        return sip
    return dip

def test_case():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    t_env = StreamTableEnvironment.create(env)

     from pyflink.table import Row
   table = t_env.from_elements(
      [("DNS", Row(source="source", devid="devid", sip="sip", dip="dip", 
qr="qr", queries="queries", answers="answers", qtypes="qtypes", 
atypes="atypes", rcode="rcode", ts="ts",))],
    DataTypes.ROW([DataTypes.FIELD("stype", DataTypes.STRING()),
 DataTypes.FIELD("data",
 DataTypes.ROW([DataTypes.FIELD('source', DataTypes.STRING()),
 DataTypes.FIELD("devid", DataTypes.STRING()),
 DataTypes.FIELD('sip', DataTypes.STRING()),
 DataTypes.FIELD('dip', DataTypes.STRING()),
 DataTypes.FIELD("qr", DataTypes.STRING()),
 DataTypes.FIELD("queries", DataTypes.STRING()),
 DataTypes.FIELD("answers", DataTypes.STRING()),
 DataTypes.FIELD("qtypes", DataTypes.STRING()),
 DataTypes.FIELD("atypes", DataTypes.STRING()),
 DataTypes.FIELD("rcode", DataTypes.STRING()),
 DataTypes.FIELD("ts", DataTypes.STRING())]))
 ]
 ))

 result_file = "/tmp/test.csv"
 if os.path.exists(result_file):
 os.remove(result_file)

 t_env.register_table_sink("Results",
 CsvTableSink(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 
'n'],
 [DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING(),
 DataTypes.STRING(),
 DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
 DataTypes.STRING(), DataTypes.STRING(),
 DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()],
 "/tmp/test.csv"))

 t_env.register_function("get_host_ip", get_host_ip)
 t_env.register_function("get_dns_server_ip", get_dns_server_ip)

 t_env.register_table("source", table)
 standard_table = t_env.sql_query("select data.*, stype as dns_type from 
source")\
 .where("dns_type.in('DNSFULL', 'DNS', 'DNSFULL_FROM_LOG', 'DNS_FROM_LOG')")
 t_env.register_table("standard_table", standard_table)

 final_table = t_env.sql_query("SELECT *, get_host_ip(source, qr, sip, dip) as 
host_ip,"
 "get_dns_server_ip(source, qr, sip, dip) as dns_server_ip FROM standard_table")

 final_table.insert_into("Results")

 t_env.execute("test")


if __name__ == '__main__':
 test_case()
{code}

The plan is as following which is not correct:
{code}
 org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: 
KafkaTableSource(type, data) -> Map -> where: (IN(type, _UTF-16LE'DNSFULL', 
_UTF-16LE'DNS', _UTF-16LE'DNSFULL_FROM_LOG', _UTF-16LE'DNS_FROM_LOG')), select: 
(data, type) -> select: (type, get_host_ip(type.source, type.qr, type.sip, 
type.dip) AS f0, get_dns_server_ip(type.source, type.qr, type.sip, type.dip) AS 
f1) -> select: (f0.source AS source, f0.devid AS devid, f0.sip AS sip, f0.dip 
AS dip, f0.qr AS qr, f0.queries AS queries, f0.answers AS answers, f0.qtypes AS 
qtypes, f0.atypes AS atypes, f0.rcode AS rcode, f0.ts AS ts, type AS dns_type, 
f0 AS host_ip, f1 AS dns_server_ip) -> to: Row -> Sink: KafkaTableSink(source, 
devid, sip, dip, qr, queries, answers, qtypes, atypes, rcode, ts, dns_type, 
host_ip, dns_server_ip) (1/4) (8d064ab137866a2a9040392a87bcc59d) switched from 
RUNNING to FAILED.
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to