Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-22 文章 chuyuan
好勒,这种方案已经成功了,非常感谢。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 Benchao Li
可以通过SQL的where条件来过滤吧

chuyuan  于2020年9月21日周一 下午6:48写道:

> 好勒,谢谢,我试试这种方案,之前注册成table,是为了按条件过滤数据;麻烦问下,直接使用ddl,如何过滤kafka中的数据?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 

Best,
Benchao Li


Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 chuyuan
好勒,谢谢,我试试这种方案,之前注册成table,是为了按条件过滤数据;麻烦问下,直接使用ddl,如何过滤kafka中的数据?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 Benchao Li
为什么要用DataStream解析之后再注册成table呢?
可以尝试下直接用DDL声明一个source,用内置的json format来解析。

chuyuan  于2020年9月21日周一 下午4:44写道:

> 我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例:
> {
> "properties":{
> "platformType":"APP",
> "$os":"iOS",
> "$screen_width":414,
> "$app_version":"1.0",
> "$is_first_day":false,
> "$model":"x86_64",
> "$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
> "imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3",
> "isLogin":false,
> "zrIdfa":"----",
> "$network_type":"WIFI",
> "$wifi":true,
> "$timezone_offset":-480,
> "$resume_from_background":false,
> "tdid":"",
> "zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
> "$screen_height":896,
> "$lib_version":"2.0.10",
> "$lib":"iOS",
> "$os_version":"13.4.1",
> "$manufacturer":"Apple",
> "$is_first_time":false,
> "$app_id":"Com.ziroom..ZRSensorsSDK"
> },
> "type":"track",
> "lib":{
> "$lib_version":"2.0.10",
> "$lib":"iOS",
> "$app_version":"1.0",
> "$lib_method":"autoTrack"
> }
> }
> 其中key为lib和properties的value是Json类型,其中字段可动态追加。
>
>
> 第一步,使用DataStream对象,接收kafka的Json,并实现MapFunction方法把Json串封装为DO,最后转换成DTO(为了把lib和properties的Json值转换为Map类型),
> @Data
> public static class CustomBuriedPointDTO {
> /**
>  * 跟踪ID
>  */
> private Long track_id;
> /**
>  * 事件时间
>  */
> private Long event_time;
>
> /**
>  * 类型
>  */
> private String type;
> /**
>  * 排重后Id
>  */
> private String distinct_id;
> /**
>  * 匿名ID
>  */
> private String anonymous_id;
> /**
>  * 包信息
>  */
> private @DataTypeHint("RAW") Map lib;
> /**
>  * 事件
>  */
> private String event;
> /**
>  * 属性
>  */
> // private Map properties;
> private @DataTypeHint("RAW") Map
> properties;
> /**
>  * 刷新时间
>  */
> private Long flush_time;
> /**
>  * 事件日期
>  */
> private String dt;
>
>
> /**
>  * 封装数据对象中字段信息
>  */
> public void assembly(CustomBuriedPointDO pointDO) {
> // 复制DO属性到DTO
> BeanUtils.copyProperties(pointDO, this);
>
> /*
> 转换特殊字段
>  */
> // 设置分区日期
> Long eventTimeLong = pointDO.getEvent_time();
> if (eventTimeLong == null) {
> eventTimeLong = System.currentTimeMillis();
> }
> Date eventTime = new Date(eventTimeLong);
> DateFormat dateFormatDate = new
> SimpleDateFormat("-MM-dd");
> this.setDt(dateFormatDate.format(eventTime));
>
> // json字段转换为Map类型
> Map propertiesMap = null;
> if
> (StringUtils.isNotBlank(pointDO.getProperties()))
> {
> propertiesMap = (Map)
> JSON.parse(pointDO.getProperties());
> }
> this.setProperties(propertiesMap);
> Map libMap = null;
> if (StringUtils.isNotBlank(pointDO.getLib())) {
> libMap = (Map)
> JSON.parse(pointDO.getLib());
> }
> this.setLib(libMap);
> }
> }
>
> 第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下:
> "CREATE TABLE test.test(" +
> "  type STRING," +
> "  lib MAP,"
> +
> "  properties
> MAP" +
> ") PARTITIONED BY (" +
> "  dt string" +
> " ) stored as orcfile " +
> " TBLPROPERTIES" +
> " (" +
>
> "'partition.time-extractor.kind'='custom'," +
>
> 

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 chuyuan
我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例:
{
"properties":{
"platformType":"APP",
"$os":"iOS",
"$screen_width":414,
"$app_version":"1.0",
"$is_first_day":false,
"$model":"x86_64",
"$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
"imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3",
"isLogin":false,
"zrIdfa":"----",
"$network_type":"WIFI",
"$wifi":true,
"$timezone_offset":-480,
"$resume_from_background":false,
"tdid":"",
"zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
"$screen_height":896,
"$lib_version":"2.0.10",
"$lib":"iOS",
"$os_version":"13.4.1",
"$manufacturer":"Apple",
"$is_first_time":false,
"$app_id":"Com.ziroom..ZRSensorsSDK"
},
"type":"track",
"lib":{
"$lib_version":"2.0.10",
"$lib":"iOS",
"$app_version":"1.0",
"$lib_method":"autoTrack"
}
}
其中key为lib和properties的value是Json类型,其中字段可动态追加。

第一步,使用DataStream对象,接收kafka的Json,并实现MapFunction方法把Json串封装为DO,最后转换成DTO(为了把lib和properties的Json值转换为Map类型),
@Data
public static class CustomBuriedPointDTO {
/**
 * 跟踪ID
 */
private Long track_id;
/**
 * 事件时间
 */
private Long event_time;

/**
 * 类型
 */
private String type;
/**
 * 排重后Id
 */
private String distinct_id;
/**
 * 匿名ID
 */
private String anonymous_id;
/**
 * 包信息
 */
private @DataTypeHint("RAW") Map lib;
/**
 * 事件
 */
private String event;
/**
 * 属性
 */
// private Map properties;
private @DataTypeHint("RAW") Map properties;
/**
 * 刷新时间
 */
private Long flush_time;
/**
 * 事件日期
 */
private String dt;
   

/**
 * 封装数据对象中字段信息
 */
public void assembly(CustomBuriedPointDO pointDO) {
// 复制DO属性到DTO
BeanUtils.copyProperties(pointDO, this);

/*
转换特殊字段
 */
// 设置分区日期
Long eventTimeLong = pointDO.getEvent_time();
if (eventTimeLong == null) {
eventTimeLong = System.currentTimeMillis();
}
Date eventTime = new Date(eventTimeLong);
DateFormat dateFormatDate = new
SimpleDateFormat("-MM-dd");
this.setDt(dateFormatDate.format(eventTime));

// json字段转换为Map类型
Map propertiesMap = null;
if (StringUtils.isNotBlank(pointDO.getProperties()))
{
propertiesMap = (Map)
JSON.parse(pointDO.getProperties());
}
this.setProperties(propertiesMap);
Map libMap = null;
if (StringUtils.isNotBlank(pointDO.getLib())) {
libMap = (Map)
JSON.parse(pointDO.getLib());
}
this.setLib(libMap);
}
}

第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下:
"CREATE TABLE test.test(" +
"  type STRING," +
"  lib MAP,"
+
"  properties
MAP" +
") PARTITIONED BY (" +
"  dt string" +
" ) stored as orcfile " +
" TBLPROPERTIES" +
" (" +
   
"'partition.time-extractor.kind'='custom'," +
   
"'partition.time-extractor.timestamp-pattern'='$dt'," +
   
"'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor',"
+
   
"'sink.partition-commit.trigger'='partition-time'," +
 

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 chuyuan
我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例:
{
"properties":{
"platformType":"APP",
"$os":"iOS",
"$screen_width":414,
"$app_version":"1.0",
"$is_first_day":false,
"$model":"x86_64",
"$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
"imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3",
"isLogin":false,
"zrIdfa":"----",
"$network_type":"WIFI",
"$wifi":true,
"$timezone_offset":-480,
"$resume_from_background":false,
"tdid":"",
"zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
"$screen_height":896,
"$lib_version":"2.0.10",
"$lib":"iOS",
"$os_version":"13.4.1",
"$manufacturer":"Apple",
"$is_first_time":false,
"$app_id":"Com.ziroom..ZRSensorsSDK"
},
"type":"track",
"lib":{
"$lib_version":"2.0.10",
"$lib":"iOS",
"$app_version":"1.0",
"$lib_method":"autoTrack"
}
}
其中key为lib和properties的value是Json类型,其中字段可动态追加。

第一步,使用DataStream对象,接收kafka的Json,并实现MapFunction方法把Json串封装为DO,最后转换成DTO(为了把lib和properties的Json值转换为Map类型),
@Data
public static class CustomBuriedPointDTO {
/**
 * 跟踪ID
 */
private Long track_id;
/**
 * 事件时间
 */
private Long event_time;

/**
 * 类型
 */
private String type;
/**
 * 排重后Id
 */
private String distinct_id;
/**
 * 匿名ID
 */
private String anonymous_id;
/**
 * 包信息
 */
private @DataTypeHint("RAW") Map lib;
/**
 * 事件
 */
private String event;
/**
 * 属性
 */
// private Map properties;
private @DataTypeHint("RAW") Map properties;
/**
 * 刷新时间
 */
private Long flush_time;
/**
 * 事件日期
 */
private String dt;
   

/**
 * 封装数据对象中字段信息
 */
public void assembly(CustomBuriedPointDO pointDO) {
// 复制DO属性到DTO
BeanUtils.copyProperties(pointDO, this);

/*
转换特殊字段
 */
// 设置分区日期
Long eventTimeLong = pointDO.getEvent_time();
if (eventTimeLong == null) {
eventTimeLong = System.currentTimeMillis();
}
Date eventTime = new Date(eventTimeLong);
DateFormat dateFormatDate = new
SimpleDateFormat("-MM-dd");
this.setDt(dateFormatDate.format(eventTime));

// json字段转换为Map类型
Map propertiesMap = null;
if (StringUtils.isNotBlank(pointDO.getProperties()))
{
propertiesMap = (Map)
JSON.parse(pointDO.getProperties());
}
this.setProperties(propertiesMap);
Map libMap = null;
if (StringUtils.isNotBlank(pointDO.getLib())) {
libMap = (Map)
JSON.parse(pointDO.getLib());
}
this.setLib(libMap);
}
}

第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下:
"CREATE TABLE test.test(" +
"  type STRING," +
"  lib MAP,"
+
"  properties
MAP" +
") PARTITIONED BY (" +
"  dt string" +
" ) stored as orcfile " +
" TBLPROPERTIES" +
" (" +
   
"'partition.time-extractor.kind'='custom'," +
   
"'partition.time-extractor.timestamp-pattern'='$dt'," +
   
"'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor',"
+
   
"'sink.partition-commit.trigger'='partition-time'," +
 

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 Benchao Li
Hi chuyuan,

可以详细描述下你遇到的问题么,比如下面这些信息
- 用的是哪个Flink版本
- SQL(包括DDL和query)
- 数据是什么样子的

chuyuan  于2020年9月21日周一 下午2:40写道:

>  LEGACY('RAW',
> 'ANY')对应sql中数据类型改为:MAP,仍然报错,异常:
> org.apache.flink.table.api.TableException: A raw type backed by type
> information has no serializable string representation. It needs to be
> resolved into a proper raw type.
> 方便说下具体实现细节吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 chuyuan
 LEGACY('RAW', 'ANY')对应sql中数据类型改为:MAP,仍然报错,异常:
org.apache.flink.table.api.TableException: A raw type backed by type
information has no serializable string representation. It needs to be
resolved into a proper raw type.
方便说下具体实现细节吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-08-11 文章 Benchao Li
Hi,

目前Json Format的实现就是假设json最外层是一个json object,暂时还无法做到顶层的所有字段无限扩展。

如果是在SQL里面,可以直接定义成map类型就可以,比如:
```SQL
CREATE TABLE source (
  d MAP
) WITH (...)
```

Zhao,Yi(SEC)  于2020年8月11日周二 下午4:58写道:

> 刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:
>
> 这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。
> 此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢?
>
> stEnv.connect(
> new Kafka()
> .properties(TestKafkaUtils.getKafkaProperties())
> .version("universal")
> .topic("test")
> .startFromLatest()
> ).withFormat(new Json()
> .failOnMissingField(false)
> ).withSchema(
> new Schema()
> .field("d", TypeInformation.of(Map.class))
> ).inAppendMode().createTemporaryTable("t");
>
> 其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW',
> 'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
> root
> |-- d: LEGACY('RAW', 'ANY')
>
>
> 在 2020/8/11 下午4:23,“zhao liang” 写入:
>
> Hi,你图挂了,换个图床试试呢
>
> 发件人: Zhao,Yi(SEC) 
> 日期: 星期二, 2020年8月11日 16:04
> 收件人: user-zh@flink.apache.org 
> 主题: 关于FLinkSQL如何创建类json无限扩展的表结构问题
> 刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:
> [cid:image001.png@01D66FF8.F697E2D0]
> 这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。
> 此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢?
>
>
> 其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW',
> 'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
> root
> |-- d: LEGACY('RAW', 'ANY')
>
>
>

-- 

Best,
Benchao Li


Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-08-11 文章 Zhao,Yi(SEC)
刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:

这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。
此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢?

stEnv.connect(
new Kafka()
.properties(TestKafkaUtils.getKafkaProperties())
.version("universal")
.topic("test")
.startFromLatest()
).withFormat(new Json()
.failOnMissingField(false)
).withSchema(
new Schema()
.field("d", TypeInformation.of(Map.class))
).inAppendMode().createTemporaryTable("t");

其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', 
'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
root
|-- d: LEGACY('RAW', 'ANY')


在 2020/8/11 下午4:23,“zhao liang” 写入:

Hi,你图挂了,换个图床试试呢

发件人: Zhao,Yi(SEC) 
日期: 星期二, 2020年8月11日 16:04
收件人: user-zh@flink.apache.org 
主题: 关于FLinkSQL如何创建类json无限扩展的表结构问题
刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:
[cid:image001.png@01D66FF8.F697E2D0]
这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。
此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢?


其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', 
'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
root
|-- d: LEGACY('RAW', 'ANY')




答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-08-11 文章 zhao liang
Hi,你图挂了,换个图床试试呢

发件人: Zhao,Yi(SEC) 
日期: 星期二, 2020年8月11日 16:04
收件人: user-zh@flink.apache.org 
主题: 关于FLinkSQL如何创建类json无限扩展的表结构问题
刚刚进一步发现一个方法可以做动态扩展的类型。代码如下:
[cid:image001.png@01D66FF8.F697E2D0]
这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。
此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢?


其次,这种效果我打印了下table的schema如下,其中d的类型是LEGACY('RAW', 
'ANY'),貌似是某种兼容类型。此处第二个疑问,通过SQL方式如何创建这种结构呢?
root
|-- d: LEGACY('RAW', 'ANY')