Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题
好勒,这种方案已经成功了,非常感谢。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题
可以通过SQL的where条件来过滤吧 chuyuan 于2020年9月21日周一 下午6:48写道: > 好勒,谢谢,我试试这种方案,之前注册成table,是为了按条件过滤数据;麻烦问下,直接使用ddl,如何过滤kafka中的数据? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best, Benchao Li
Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题
好勒,谢谢,我试试这种方案,之前注册成table,是为了按条件过滤数据;麻烦问下,直接使用ddl,如何过滤kafka中的数据? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题
为什么要用DataStream解析之后再注册成table呢? 可以尝试下直接用DDL声明一个source,用内置的json format来解析。 chuyuan 于2020年9月21日周一 下午4:44写道: > 我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例: > { > "properties":{ > "platformType":"APP", > "$os":"iOS", > "$screen_width":414, > "$app_version":"1.0", > "$is_first_day":false, > "$model":"x86_64", > "$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9", > "imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3", > "isLogin":false, > "zrIdfa":"----", > "$network_type":"WIFI", > "$wifi":true, > "$timezone_offset":-480, > "$resume_from_background":false, > "tdid":"", > "zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9", > "$screen_height":896, > "$lib_version":"2.0.10", > "$lib":"iOS", > "$os_version":"13.4.1", > "$manufacturer":"Apple", > "$is_first_time":false, > "$app_id":"Com.ziroom..ZRSensorsSDK" > }, > "type":"track", > "lib":{ > "$lib_version":"2.0.10", > "$lib":"iOS", > "$app_version":"1.0", > "$lib_method":"autoTrack" > } > } > 其中key为lib和properties的value是Json类型,其中字段可动态追加。 > > > 第一步,使用DataStream对象,接收kafka的Json,并实现MapFunction方法把Json串封装为DO,最后转换成DTO(为了把lib和properties的Json值转换为Map类型), > @Data > public static class CustomBuriedPointDTO { > /** > * 跟踪ID > */ > private Long track_id; > /** > * 事件时间 > */ > private Long event_time; > > /** > * 类型 > */ > private String type; > /** > * 排重后Id > */ > private String distinct_id; > /** > * 匿名ID > */ > private String anonymous_id; > /** > * 包信息 > */ > private @DataTypeHint("RAW") Map lib; > /** > * 事件 > */ > private String event; > /** > * 属性 > */ > // private Map properties; > private @DataTypeHint("RAW") Map > properties; > /** > * 刷新时间 > */ > private Long flush_time; > /** > * 事件日期 > */ > private String dt; > > > /** > * 封装数据对象中字段信息 > */ > public void assembly(CustomBuriedPointDO pointDO) { > // 复制DO属性到DTO > BeanUtils.copyProperties(pointDO, this); > > /* > 转换特殊字段 > */ > // 设置分区日期 > Long eventTimeLong = pointDO.getEvent_time(); > if (eventTimeLong == null) { > eventTimeLong = System.currentTimeMillis(); > } > Date eventTime = new Date(eventTimeLong); > DateFormat dateFormatDate = new > SimpleDateFormat("-MM-dd"); > this.setDt(dateFormatDate.format(eventTime)); > > // json字段转换为Map类型 > Map propertiesMap = null; > if > (StringUtils.isNotBlank(pointDO.getProperties())) > { > propertiesMap = (Map) > JSON.parse(pointDO.getProperties()); > } > this.setProperties(propertiesMap); > Map libMap = null; > if (StringUtils.isNotBlank(pointDO.getLib())) { > libMap = (Map) > JSON.parse(pointDO.getLib()); > } > this.setLib(libMap); > } > } > > 第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下: > "CREATE TABLE test.test(" + > " type STRING," + > " lib MAP," > + > " properties > MAP" + > ") PARTITIONED BY (" + > " dt string" + > " ) stored as orcfile " + > " TBLPROPERTIES" + > " (" + > > "'partition.time-extractor.kind'='custom'," + > > "'partition.time-ex
Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题
我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例: { "properties":{ "platformType":"APP", "$os":"iOS", "$screen_width":414, "$app_version":"1.0", "$is_first_day":false, "$model":"x86_64", "$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9", "imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3", "isLogin":false, "zrIdfa":"----", "$network_type":"WIFI", "$wifi":true, "$timezone_offset":-480, "$resume_from_background":false, "tdid":"", "zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9", "$screen_height":896, "$lib_version":"2.0.10", "$lib":"iOS", "$os_version":"13.4.1", "$manufacturer":"Apple", "$is_first_time":false, "$app_id":"Com.ziroom..ZRSensorsSDK" }, "type":"track", "lib":{ "$lib_version":"2.0.10", "$lib":"iOS", "$app_version":"1.0", "$lib_method":"autoTrack" } } 其中key为lib和properties的value是Json类型,其中字段可动态追加。 第一步,使用DataStream对象,接收kafka的Json,并实现MapFunction方法把Json串封装为DO,最后转换成DTO(为了把lib和properties的Json值转换为Map类型), @Data public static class CustomBuriedPointDTO { /** * 跟踪ID */ private Long track_id; /** * 事件时间 */ private Long event_time; /** * 类型 */ private String type; /** * 排重后Id */ private String distinct_id; /** * 匿名ID */ private String anonymous_id; /** * 包信息 */ private @DataTypeHint("RAW") Map lib; /** * 事件 */ private String event; /** * 属性 */ // private Map properties; private @DataTypeHint("RAW") Map properties; /** * 刷新时间 */ private Long flush_time; /** * 事件日期 */ private String dt; /** * 封装数据对象中字段信息 */ public void assembly(CustomBuriedPointDO pointDO) { // 复制DO属性到DTO BeanUtils.copyProperties(pointDO, this); /* 转换特殊字段 */ // 设置分区日期 Long eventTimeLong = pointDO.getEvent_time(); if (eventTimeLong == null) { eventTimeLong = System.currentTimeMillis(); } Date eventTime = new Date(eventTimeLong); DateFormat dateFormatDate = new SimpleDateFormat("-MM-dd"); this.setDt(dateFormatDate.format(eventTime)); // json字段转换为Map类型 Map propertiesMap = null; if (StringUtils.isNotBlank(pointDO.getProperties())) { propertiesMap = (Map) JSON.parse(pointDO.getProperties()); } this.setProperties(propertiesMap); Map libMap = null; if (StringUtils.isNotBlank(pointDO.getLib())) { libMap = (Map) JSON.parse(pointDO.getLib()); } this.setLib(libMap); } } 第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下: "CREATE TABLE test.test(" + " type STRING," + " lib MAP," + " properties MAP" + ") PARTITIONED BY (" + " dt string" + " ) stored as orcfile " + " TBLPROPERTIES" + " (" + "'partition.time-extractor.kind'='custom'," + "'partition.time-extractor.timestamp-pattern'='$dt'," + "'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor'," + "'sink.partition-commit.trigger'='partition-time'," +
Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题
我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例: { "properties":{ "platformType":"APP", "$os":"iOS", "$screen_width":414, "$app_version":"1.0", "$is_first_day":false, "$model":"x86_64", "$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9", "imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3", "isLogin":false, "zrIdfa":"----", "$network_type":"WIFI", "$wifi":true, "$timezone_offset":-480, "$resume_from_background":false, "tdid":"", "zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9", "$screen_height":896, "$lib_version":"2.0.10", "$lib":"iOS", "$os_version":"13.4.1", "$manufacturer":"Apple", "$is_first_time":false, "$app_id":"Com.ziroom..ZRSensorsSDK" }, "type":"track", "lib":{ "$lib_version":"2.0.10", "$lib":"iOS", "$app_version":"1.0", "$lib_method":"autoTrack" } } 其中key为lib和properties的value是Json类型,其中字段可动态追加。 第一步,使用DataStream对象,接收kafka的Json,并实现MapFunction方法把Json串封装为DO,最后转换成DTO(为了把lib和properties的Json值转换为Map类型), @Data public static class CustomBuriedPointDTO { /** * 跟踪ID */ private Long track_id; /** * 事件时间 */ private Long event_time; /** * 类型 */ private String type; /** * 排重后Id */ private String distinct_id; /** * 匿名ID */ private String anonymous_id; /** * 包信息 */ private @DataTypeHint("RAW") Map lib; /** * 事件 */ private String event; /** * 属性 */ // private Map properties; private @DataTypeHint("RAW") Map properties; /** * 刷新时间 */ private Long flush_time; /** * 事件日期 */ private String dt; /** * 封装数据对象中字段信息 */ public void assembly(CustomBuriedPointDO pointDO) { // 复制DO属性到DTO BeanUtils.copyProperties(pointDO, this); /* 转换特殊字段 */ // 设置分区日期 Long eventTimeLong = pointDO.getEvent_time(); if (eventTimeLong == null) { eventTimeLong = System.currentTimeMillis(); } Date eventTime = new Date(eventTimeLong); DateFormat dateFormatDate = new SimpleDateFormat("-MM-dd"); this.setDt(dateFormatDate.format(eventTime)); // json字段转换为Map类型 Map propertiesMap = null; if (StringUtils.isNotBlank(pointDO.getProperties())) { propertiesMap = (Map) JSON.parse(pointDO.getProperties()); } this.setProperties(propertiesMap); Map libMap = null; if (StringUtils.isNotBlank(pointDO.getLib())) { libMap = (Map) JSON.parse(pointDO.getLib()); } this.setLib(libMap); } } 第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下: "CREATE TABLE test.test(" + " type STRING," + " lib MAP," + " properties MAP" + ") PARTITIONED BY (" + " dt string" + " ) stored as orcfile " + " TBLPROPERTIES" + " (" + "'partition.time-extractor.kind'='custom'," + "'partition.time-extractor.timestamp-pattern'='$dt'," + "'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor'," + "'sink.partition-commit.trigger'='partition-time'," +
Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题
Hi chuyuan, 可以详细描述下你遇到的问题么,比如下面这些信息 - 用的是哪个Flink版本 - SQL(包括DDL和query) - 数据是什么样子的 chuyuan 于2020年9月21日周一 下午2:40写道: > LEGACY('RAW', > 'ANY')对应sql中数据类型改为:MAP,仍然报错,异常: > org.apache.flink.table.api.TableException: A raw type backed by type > information has no serializable string representation. It needs to be > resolved into a proper raw type. > 方便说下具体实现细节吗? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li
Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题
LEGACY('RAW', 'ANY')对应sql中数据类型改为:MAP,仍然报错,异常: org.apache.flink.table.api.TableException: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type. 方便说下具体实现细节吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题
Hi, 目前Json Format的实现就是假设json最外层是一个json object,暂时还无法做到顶层的所有字段无限扩展。 如果是在SQL里面,可以直接定义成map类型就可以,比如: ```SQL CREATE TABLE source ( d MAP ) WITH (...) ``` Zhao,Yi(SEC) 于2020年8月11日周二 下午4:58写道: > 刚刚进一步发现一个方法可以做动态扩展的类型。代码如下: > > 这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。 > 此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢? > > stEnv.connect( > new Kafka() > .properties(TestKafkaUtils.getKafkaProperties()) > .version("universal") > .topic("test") > .startFromLatest() > ).withFormat(new Json() > .failOnMissingField(false) > ).withSchema( > new Schema() > .field("d", TypeInformation.of(Map.class)) > ).inAppendMode().createTemporaryTable("t"); > > 其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', > 'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢? > root > |-- d: LEGACY('RAW', 'ANY') > > > 在 2020/8/11 下午4:23,“zhao liang” 写入: > > Hi,你图挂了,换个图床试试呢 > > 发件人: Zhao,Yi(SEC) > 日期: 星期二, 2020年8月11日 16:04 > 收件人: user-zh@flink.apache.org > 主题: 关于FLinkSQL如何创建类json无限扩展的表结构问题 > 刚刚进一步发现一个方法可以做动态扩展的类型。代码如下: > [cid:image001.png@01D66FF8.F697E2D0] > 这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。 > 此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢? > > > 其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', > 'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢? > root > |-- d: LEGACY('RAW', 'ANY') > > > -- Best, Benchao Li
Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题
刚刚进一步发现一个方法可以做动态扩展的类型。代码如下: 这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。 此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢? stEnv.connect( new Kafka() .properties(TestKafkaUtils.getKafkaProperties()) .version("universal") .topic("test") .startFromLatest() ).withFormat(new Json() .failOnMissingField(false) ).withSchema( new Schema() .field("d", TypeInformation.of(Map.class)) ).inAppendMode().createTemporaryTable("t"); 其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', 'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢? root |-- d: LEGACY('RAW', 'ANY') 在 2020/8/11 下午4:23,“zhao liang” 写入: Hi,你图挂了,换个图床试试呢 发件人: Zhao,Yi(SEC) 日期: 星期二, 2020年8月11日 16:04 收件人: user-zh@flink.apache.org 主题: 关于FLinkSQL如何创建类json无限扩展的表结构问题 刚刚进一步发现一个方法可以做动态扩展的类型。代码如下: [cid:image001.png@01D66FF8.F697E2D0] 这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。 此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢? 其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', 'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢? root |-- d: LEGACY('RAW', 'ANY')
答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题
Hi,你图挂了,换个图床试试呢 发件人: Zhao,Yi(SEC) 日期: 星期二, 2020年8月11日 16:04 收件人: user-zh@flink.apache.org 主题: 关于FLinkSQL如何创建类json无限扩展的表结构问题 刚刚进一步发现一个方法可以做动态扩展的类型。代码如下: [cid:image001.png@01D66FF8.F697E2D0] 这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。 此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢? 其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', 'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢? root |-- d: LEGACY('RAW', 'ANY')