您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值 udf定义如下: @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING()) def fun(data): b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
如果通过table.select("after.b")或者table.select('after').select('b')也会报错 希望您能给我提供好的解决办法,万分感谢! 在 2020-09-03 22:23:28,"Xingbo Huang" <hxbks...@gmail.com> 写道: >Hi, > >我觉得你从头详细描述一下你的表结构。 >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的, >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。 >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1] > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions > >Best, >Xingbo > ><whh_960...@163.com> 于2020年9月3日周四 下午9:45写道: > >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法 >> input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()] >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗 >> 或者正确写法是什么样的,感谢解答! >> >> >> | | >> whh_960101 >> | >> | >> 邮箱:whh_960...@163.com >> | >> >> 签名由 网易邮箱大师 定制 >> >> 在2020年09月03日 21:14,Xingbo Huang 写道: >> Hi, >> input_types定义的是每一个列的具体类型。 >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你 >> 正确的写法是 >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()] >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的) >> input_types=DataTypes.Row([DataTypes.FIELD("a", DataTypes.STRING()), >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c", >> DataTypes.STRING())]) >> >> Best, >> Xingbo >> >> whh_960101 <whh_960...@163.com> 于2020年9月3日周四 下午9:03写道: >> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid >> > input_type:input_type should be DataType but contain RowField(RECID, >> > VARCHAR) >> > 我的pyflink版本:1.11.1 >>