[ 
https://issues.apache.org/jira/browse/FLINK-17093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083102#comment-17083102
 ] 

Dian Fu edited comment on FLINK-17093 at 4/14/20, 10:51 AM:
------------------------------------------------------------

[~chesnay] I'm not sure if the test failure is related to the change of this 
JIRA. Can I know which branch is this test failure from? release-1.10 or master?


was (Author: dian.fu):
[~chesnay] I'm not sure. Can I know which branch is this test failure from? 
release-1.10 or master?

> Python UDF doesn't work when the input column is from composite field
> ---------------------------------------------------------------------
>
>                 Key: FLINK-17093
>                 URL: https://issues.apache.org/jira/browse/FLINK-17093
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.10.0
>            Reporter: Dian Fu
>            Assignee: Dian Fu
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.10.1, 1.11.0
>
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> For the following job:
> {code}
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import BatchTableEnvironment, StreamTableEnvironment, 
> EnvironmentSettings, CsvTableSink
> from pyflink.table.descriptors import Schema, Kafka, Json
> from pyflink.table import DataTypes
> from pyflink.table.udf import ScalarFunction, udf
> import os
> @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
>  DataTypes.STRING()],
>  result_type=DataTypes.STRING())
> def get_host_ip(source, qr, sip, dip):
>     if source == "NGAF" and qr == '1':
>         return dip
>     return sip
> @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
>  DataTypes.STRING()],
>  result_type=DataTypes.STRING())
> def get_dns_server_ip(source, qr, sip, dip):
>     if source == "NGAF" and qr == '1':
>         return sip
>     return dip
> def test_case():
>     env = StreamExecutionEnvironment.get_execution_environment()
>     env.set_parallelism(1)
>     t_env = StreamTableEnvironment.create(env)
>      from pyflink.table import Row
>    table = t_env.from_elements(
>       [("DNS", Row(source="source", devid="devid", sip="sip", dip="dip", 
> qr="qr", queries="queries", answers="answers", qtypes="qtypes", 
> atypes="atypes", rcode="rcode", ts="ts",))],
>     DataTypes.ROW([DataTypes.FIELD("stype", DataTypes.STRING()),
>  DataTypes.FIELD("data",
>  DataTypes.ROW([DataTypes.FIELD('source', DataTypes.STRING()),
>  DataTypes.FIELD("devid", DataTypes.STRING()),
>  DataTypes.FIELD('sip', DataTypes.STRING()),
>  DataTypes.FIELD('dip', DataTypes.STRING()),
>  DataTypes.FIELD("qr", DataTypes.STRING()),
>  DataTypes.FIELD("queries", DataTypes.STRING()),
>  DataTypes.FIELD("answers", DataTypes.STRING()),
>  DataTypes.FIELD("qtypes", DataTypes.STRING()),
>  DataTypes.FIELD("atypes", DataTypes.STRING()),
>  DataTypes.FIELD("rcode", DataTypes.STRING()),
>  DataTypes.FIELD("ts", DataTypes.STRING())]))
>  ]
>  ))
>  result_file = "/tmp/test.csv"
>  if os.path.exists(result_file):
>  os.remove(result_file)
>  t_env.register_table_sink("Results",
>  CsvTableSink(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 
> 'm', 'n'],
>  [DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), 
> DataTypes.STRING(),
>  DataTypes.STRING(),
>  DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), 
> DataTypes.STRING(),
>  DataTypes.STRING(), DataTypes.STRING(),
>  DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()],
>  "/tmp/test.csv"))
>  t_env.register_function("get_host_ip", get_host_ip)
>  t_env.register_function("get_dns_server_ip", get_dns_server_ip)
>  t_env.register_table("source", table)
>  standard_table = t_env.sql_query("select data.*, stype as dns_type from 
> source")\
>  .where("dns_type.in('DNSFULL', 'DNS', 'DNSFULL_FROM_LOG', 'DNS_FROM_LOG')")
>  t_env.register_table("standard_table", standard_table)
>  final_table = t_env.sql_query("SELECT *, get_host_ip(source, qr, sip, dip) 
> as host_ip,"
>  "get_dns_server_ip(source, qr, sip, dip) as dns_server_ip FROM 
> standard_table")
>  final_table.insert_into("Results")
>  t_env.execute("test")
> if __name__ == '__main__':
>  test_case()
> {code}
> The plan is as following which is not correct:
> {code}
>  org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: 
> KafkaTableSource(type, data) -> Map -> where: (IN(type, _UTF-16LE'DNSFULL', 
> _UTF-16LE'DNS', _UTF-16LE'DNSFULL_FROM_LOG', _UTF-16LE'DNS_FROM_LOG')), 
> select: (data, type) -> select: (type, get_host_ip(type.source, type.qr, 
> type.sip, type.dip) AS f0, get_dns_server_ip(type.source, type.qr, type.sip, 
> type.dip) AS f1) -> select: (f0.source AS source, f0.devid AS devid, f0.sip 
> AS sip, f0.dip AS dip, f0.qr AS qr, f0.queries AS queries, f0.answers AS 
> answers, f0.qtypes AS qtypes, f0.atypes AS atypes, f0.rcode AS rcode, f0.ts 
> AS ts, type AS dns_type, f0 AS host_ip, f1 AS dns_server_ip) -> to: Row -> 
> Sink: KafkaTableSink(source, devid, sip, dip, qr, queries, answers, qtypes, 
> atypes, rcode, ts, dns_type, host_ip, dns_server_ip) (1/4) 
> (8d064ab137866a2a9040392a87bcc59d) switched from RUNNING to FAILED.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to