+1 to support pb format
如果能支持 pb format 那简直太好了,实际了我们也自己搞了一个 pb format。大概的方法也是在外边做了一个对应的service,这个service保存了一个jar,在进行parse byte 的时候,采用了urlclassload+反射调用 parse 类型的方法。 同时也尝试过使用 dynamic message 的方式,这个方式更轻量一些,但是,性能差强人意。 在 2020-06-09 14:49:02,"Jark Wu" <imj...@gmail.com> 写道: >+1 to support pb format. > >On Tue, 9 Jun 2020 at 14:47, Benchao Li <libenc...@apache.org> wrote: > >> 我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。 >> 我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。 >> 这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。 >> >> 之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求, >> 我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。 >> >> 1048262223 <1048262...@qq.com> 于2020年6月9日周二 下午2:23写道: >> >> > Hi >> > >> > >> > >> > >> 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。 >> > >> > >> > >> > >> 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。 >> > >> > >> > 如果有理解不对之处,敬请指出。 >> > >> > >> > Best, >> > Yichao Yang >> > >> > >> > >> > >> > ------------------ 原始邮件 ------------------ >> > 发件人: "forideal"<fszw...@163.com>; >> > 发送时间: 2020年6月9日(星期二) 中午1:33 >> > 收件人: "user-zh"<user-zh@flink.apache.org>; >> > >> > 主题: Flink SQL UDF 动态类型 >> > >> > >> > >> > 你好,我的朋友: >> > >> > >> > 我使用的是 Flink 1.10 Blink Planer。 >> > 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。 >> > >> > >> > 为什么我想要这个功能: >> > 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回 >> > string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多 >> > 场景2: 我的数据是一个 Json ,问题同上。 >> > >> > 在场景1中,我改了下 Flink 的源码,在 ScalarFunction >> > 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化 >> > @Override >> > public void initialize(LogicalType[] sqlTypes, String[] paramNames) { >> > // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型 >> > } >> > 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。 >> > 这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?) >> > 这个类型不进行 cast 是无法直接使用的。 >> > public class TimestampTest extends ScalarFunction { >> > >> > public Object eval(long timestamp, String pattern, int num) { >> > Timestamp timestamp1 = new >> > Timestamp(timestamp); >> > SimpleDateFormat sdf = new SimpleDateFormat(pattern); >> > if (num < 4) { >> > //返回 STRING 类型 >> > return String.valueOf(timestamp); >> > } >> > if (num < 6) { >> > //返回 BIGINT >> > return timestamp - 100; >> > } >> > if (num < 8) { >> > //返回 DOUBLE >> > double ss = 0.9; >> > return >> > (double) timestamp + ss; >> > } >> > //返回 STRING >> > return sdf.format(timestamp1); >> > } >> > } >>