Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-22 文章 chuyuan
好勒,这种方案已经成功了,非常感谢。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 chuyuan
好勒,谢谢,我试试这种方案,之前注册成table,是为了按条件过滤数据;麻烦问下,直接使用ddl,如何过滤kafka中的数据?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 chuyuan
我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例:
{
"properties":{
"platformType":"APP",
"$os":"iOS",
"$screen_width":414,
"$app_version":"1.0",
"$is_first_day":false,
"$model":"x86_64",
"$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
"imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3",
"isLogin":false,
"zrIdfa":"----",
"$network_type":"WIFI",
"$wifi":true,
"$timezone_offset":-480,
"$resume_from_background":false,
"tdid":"",
"zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
"$screen_height":896,
"$lib_version":"2.0.10",
"$lib":"iOS",
"$os_version":"13.4.1",
"$manufacturer":"Apple",
"$is_first_time":false,
"$app_id":"Com.ziroom..ZRSensorsSDK"
},
"type":"track",
"lib":{
"$lib_version":"2.0.10",
"$lib":"iOS",
"$app_version":"1.0",
"$lib_method":"autoTrack"
}
}
其中key为lib和properties的value是Json类型,其中字段可动态追加。

第一步,使用DataStream对象,接收kafka的Json,并实现MapFunction方法把Json串封装为DO,最后转换成DTO(为了把lib和properties的Json值转换为Map类型),
@Data
public static class CustomBuriedPointDTO {
/**
 * 跟踪ID
 */
private Long track_id;
/**
 * 事件时间
 */
private Long event_time;

/**
 * 类型
 */
private String type;
/**
 * 排重后Id
 */
private String distinct_id;
/**
 * 匿名ID
 */
private String anonymous_id;
/**
 * 包信息
 */
private @DataTypeHint("RAW") Map lib;
/**
 * 事件
 */
private String event;
/**
 * 属性
 */
// private Map properties;
private @DataTypeHint("RAW") Map properties;
/**
 * 刷新时间
 */
private Long flush_time;
/**
 * 事件日期
 */
private String dt;
   

/**
 * 封装数据对象中字段信息
 */
public void assembly(CustomBuriedPointDO pointDO) {
// 复制DO属性到DTO
BeanUtils.copyProperties(pointDO, this);

/*
转换特殊字段
 */
// 设置分区日期
Long eventTimeLong = pointDO.getEvent_time();
if (eventTimeLong == null) {
eventTimeLong = System.currentTimeMillis();
}
Date eventTime = new Date(eventTimeLong);
DateFormat dateFormatDate = new
SimpleDateFormat("-MM-dd");
this.setDt(dateFormatDate.format(eventTime));

// json字段转换为Map类型
Map propertiesMap = null;
if (StringUtils.isNotBlank(pointDO.getProperties()))
{
propertiesMap = (Map)
JSON.parse(pointDO.getProperties());
}
this.setProperties(propertiesMap);
Map libMap = null;
if (StringUtils.isNotBlank(pointDO.getLib())) {
libMap = (Map)
JSON.parse(pointDO.getLib());
}
this.setLib(libMap);
}
}

第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下:
"CREATE TABLE test.test(" +
"  type STRING," +
"  lib MAP,"
+
"  properties
MAP" +
") PARTITIONED BY (" +
"  dt string" +
" ) stored as orcfile " +
" TBLPROPERTIES" +
" (" +
   
"'partition.time-extractor.kind'='custom'," +
   
"'partition.time-extractor.timestamp-pattern'='$dt'," +
   
"'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor',"
+
   
"'sink.partition-commit.trigger'='partition-time'," +
 

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 chuyuan
我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例:
{
"properties":{
"platformType":"APP",
"$os":"iOS",
"$screen_width":414,
"$app_version":"1.0",
"$is_first_day":false,
"$model":"x86_64",
"$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
"imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3",
"isLogin":false,
"zrIdfa":"----",
"$network_type":"WIFI",
"$wifi":true,
"$timezone_offset":-480,
"$resume_from_background":false,
"tdid":"",
"zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
"$screen_height":896,
"$lib_version":"2.0.10",
"$lib":"iOS",
"$os_version":"13.4.1",
"$manufacturer":"Apple",
"$is_first_time":false,
"$app_id":"Com.ziroom..ZRSensorsSDK"
},
"type":"track",
"lib":{
"$lib_version":"2.0.10",
"$lib":"iOS",
"$app_version":"1.0",
"$lib_method":"autoTrack"
}
}
其中key为lib和properties的value是Json类型,其中字段可动态追加。

第一步,使用DataStream对象,接收kafka的Json,并实现MapFunction方法把Json串封装为DO,最后转换成DTO(为了把lib和properties的Json值转换为Map类型),
@Data
public static class CustomBuriedPointDTO {
/**
 * 跟踪ID
 */
private Long track_id;
/**
 * 事件时间
 */
private Long event_time;

/**
 * 类型
 */
private String type;
/**
 * 排重后Id
 */
private String distinct_id;
/**
 * 匿名ID
 */
private String anonymous_id;
/**
 * 包信息
 */
private @DataTypeHint("RAW") Map lib;
/**
 * 事件
 */
private String event;
/**
 * 属性
 */
// private Map properties;
private @DataTypeHint("RAW") Map properties;
/**
 * 刷新时间
 */
private Long flush_time;
/**
 * 事件日期
 */
private String dt;
   

/**
 * 封装数据对象中字段信息
 */
public void assembly(CustomBuriedPointDO pointDO) {
// 复制DO属性到DTO
BeanUtils.copyProperties(pointDO, this);

/*
转换特殊字段
 */
// 设置分区日期
Long eventTimeLong = pointDO.getEvent_time();
if (eventTimeLong == null) {
eventTimeLong = System.currentTimeMillis();
}
Date eventTime = new Date(eventTimeLong);
DateFormat dateFormatDate = new
SimpleDateFormat("-MM-dd");
this.setDt(dateFormatDate.format(eventTime));

// json字段转换为Map类型
Map propertiesMap = null;
if (StringUtils.isNotBlank(pointDO.getProperties()))
{
propertiesMap = (Map)
JSON.parse(pointDO.getProperties());
}
this.setProperties(propertiesMap);
Map libMap = null;
if (StringUtils.isNotBlank(pointDO.getLib())) {
libMap = (Map)
JSON.parse(pointDO.getLib());
}
this.setLib(libMap);
}
}

第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下:
"CREATE TABLE test.test(" +
"  type STRING," +
"  lib MAP,"
+
"  properties
MAP" +
") PARTITIONED BY (" +
"  dt string" +
" ) stored as orcfile " +
" TBLPROPERTIES" +
" (" +
   
"'partition.time-extractor.kind'='custom'," +
   
"'partition.time-extractor.timestamp-pattern'='$dt'," +
   
"'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor',"
+
   
"'sink.partition-commit.trigger'='partition-time'," +
 

Flink消费kafka中json数据,其中有个value是Json类型,写入Hive表Map结构异常

2020-09-21 文章 chuyuan
hello,大婶们,Flink消费kafka中json数据,示例:
{
"properties":{
"platformType":"APP",
"$os":"iOS",
"$screen_width":414,
"$app_version":"1.0",
"$is_first_day":false,
"$model":"x86_64",
"$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
"imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3",
"isLogin":false,
"zrIdfa":"----",
"$network_type":"WIFI",
"$wifi":true,
"$timezone_offset":-480,
"$resume_from_background":false,
"tdid":"",
"zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
"$screen_height":896,
"$lib_version":"2.0.10",
"$lib":"iOS",
"$os_version":"13.4.1",
"$manufacturer":"Apple",
"$is_first_time":false,
"$app_id":"Com.ziroom..ZRSensorsSDK"
},
"type":"track",
"lib":{
"$lib_version":"2.0.10",
"$lib":"iOS",
"$app_version":"1.0",
"$lib_method":"autoTrack"
}
}
其中key为lib和properties的value是Json类型,其中字段可动态追加,
然后我把Json传封装为DO,最后为了转换lib和properties的value为Map,转换成了DTO,
@Data
public static class CustomBuriedPointDTO {
/**
 * 跟踪ID
 */
private Long track_id;
/**
 * 事件时间
 */
private Long event_time;

/**
 * 类型
 */
private String type;
/**
 * 排重后Id
 */
private String distinct_id;
/**
 * 匿名ID
 */
private String anonymous_id;
/**
 * 包信息
 */
private @DataTypeHint("RAW") Map lib;
/**
 * 事件
 */
private String event;
/**
 * 属性
 */
//  private Map properties;
private @DataTypeHint("RAW") Map properties;
/**
 * 刷新时间
 */
private Long flush_time;
/**
 * 事件日期
 */
private String dt;


/**
 * 封装数据对象中字段信息
 */
public void assembly(CustomBuriedPointDO pointDO) {
// 复制DO属性到DTO
BeanUtils.copyProperties(pointDO, this);

/*
转换特殊字段
 */
// 设置分区日期
Long eventTimeLong = pointDO.getEvent_time();
if (eventTimeLong == null) {
eventTimeLong = System.currentTimeMillis();
}
Date eventTime = new Date(eventTimeLong);
DateFormat dateFormatDate = new 
SimpleDateFormat("-MM-dd");
this.setDt(dateFormatDate.format(eventTime));

// json字段转换为Map类型
Map propertiesMap = null;
if (StringUtils.isNotBlank(pointDO.getProperties())) {
propertiesMap = (Map)
JSON.parse(pointDO.getProperties());
}
this.setProperties(propertiesMap);
Map libMap = null;
if (StringUtils.isNotBlank(pointDO.getLib())) {
libMap = (Map) 
JSON.parse(pointDO.getLib());
}
this.setLib(libMap);
}
}
,然后把DataStream转成了Hive临时表,最后写入Hive表,hive表定义如下:
"CREATE TABLE test.test(" +
"  type STRING," +
"  lib MAP," +
"  properties 
MAP" +
") PARTITIONED BY (" +
"  dt string" +
" ) stored as orcfile " +
" TBLPROPERTIES" +
" (" +

"'partition.time-extractor.kind'='custom'," +

"'partition.time-extractor.timestamp-pattern'='$dt'," +

"'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor',"
+

"'sink.partition-commit.trigger'='partition-time'," +

"'sink.

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-20 文章 chuyuan
 LEGACY('RAW', 'ANY')对应sql中数据类型改为:MAP,仍然报错,异常:
org.apache.flink.table.api.TableException: A raw type backed by type
information has no serializable string representation. It needs to be
resolved into a proper raw type.
方便说下具体实现细节吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Table api son schema

2020-09-20 文章 chuyuan
hello,我也遇到了类似的问题,最终的解决方案能分享下吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/