Re: StreamTableEnvironment.fromDataStream(dataStream)如何生成T类型的Schema?

2022-09-23 文章 Shammon FY
Hi

你的Event类定义没有贴完整,不过有个怀疑点,你应该有一个public Event(int, String, long,
long)的构造函数,需要在Event定义里增加一个空的构造函数,类似这样
public class Event implements Serializable {
private static final long serialVersionUID = 4826873295740075360L;
public int t = 0;
public String user = "";
public long event = 0L;
public long timestamp = LocalDateTime.nowTimeMillis();

public Event() { }
}

Flink需要根据空的构造函数才能识别出这是一个pojo类

On Fri, Sep 23, 2022 at 5:37 PM Frank  wrote:

> Hi, Shammon,
>
>
> 嗯,我是该贴一下Event类,如下:
>
>
> public class Event implements Serializable {
>
>
> private static final long serialVersionUID = 4826873295740075360L;
>
>
> public int t = 0;
> public String user = "";
> public long event = 0L;
> public long timestamp = LocalDateTime.nowTimeMillis();
>
>
> }
>
>
> 你可以看到,字段都是public。
>
>
> 我刚试著改成private然后添加getter, setter,但结果一样。
>
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "Shammon FY"
>   <
> zjur...@gmail.com>;
> 发送时间: 2022年9月23日(星期五) 中午11:20
> 收件人: "user-zh" >;
>
> 主题: Re:
> StreamTableEnvironment.fromDataStream(dataStream)如何生成T类型的Schema?
>
>
>
> Hi @frank
>
> 你没有贴你定义的Event类代码,我觉得你的Event类定义有点问题
>
> 如果需要flink识别Event的内部字段,需要将Event定义成pojo类,例如将字段都定义成public,或者可以是private,但是需要增加setXXX和getXXX函数
>
>
>
>
>
>
> On Thu, Sep 22, 2022 at 5:45 PM Frank 
> DataStreamSource                  new Event(0,
> "张三", 1L), new Event(0, "孙小美", 1L));
>  StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
>  Table table = tenv.fromDataStream(datastream)
>  table.printSchema();
>  为什么上面代码生成table的schema是下面这样而不是Event的字段(t, user, event, timestamp)?
>  (
>    `f0` RAW('utils.transfor.Event', '...')
>  )
>  怎么改?


Re: Re: flink实时双流驱动join问题

2022-09-23 文章 Zhiwen Sun
实际业务的确是这样的。
state 永不过期, 要全量的数据计算,全量的数据放到 state 里面。

目前看来只有等 flink table store 了。

Zhiwen Sun



On Fri, Sep 23, 2022 at 8:29 AM casel.chen  wrote:

>
> 我这里只是举了一个例子表示Flink用于OLAP实时关联场景会遇到的一个问题,实际业务中确实会出现两张关联表都需要更新情况,不管哪一边更新数据业务都想获取到最新关联结果,而不是旧的关联状态。引出我想问的另一个问题是如果查询模式固定,Flink实时关联是否能取代OLAP系统例如Doris呢?
> 1. 一般双流join为避免state无限膨胀,都会设置ttl,你这边的业务场景ttl需要保留n个月?
> 确切地说不应该设置ttl,业务数据有长尾效应,大多数都在当天更新完毕,短的几秒种,长的甚至会在半年后还发生更新
>
>
> 2. order流和user流在业务场景上要求的state ttl时长是不是不一样?
> 同上
>
>
> 3. order流和user流的数据规模/state size规模大概可以到什么级别?
> TB级别
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-09-20 10:28:49,"Jinzhong Li"  写道:
> >hi,casel, 关于你们的业务场景,我有几个问题, 希望可以交流一下。
> >1. 一般双流join为避免state无限膨胀,都会设置ttl,你这边的业务场景ttl需要保留n个月?
> >2. order流和user流在业务场景上要求的state ttl时长是不是不一样?
> >(从你描述上来看,user流的ttl需要几个月,order流可以比较短些?)
> >3. order流和user流的数据规模/state size规模大概可以到什么级别?
> >
> >casel.chen  于2022年9月17日周六 10:59写道:
> >
> >> 请教一个flink实现实时双流驱动join问题:
> >>
> >>
> >> order cdc流字段:order_id, order_status, order_time, user_id (order_id是主键)
> >> user cdc流字段:user_id, user_name, user_phone, user_address(user_id是主键)
> >> 关联结果流字段:order_id, order_status, order_time, user_name, user_phone,
> >> user_address(order_id是主键)
> >> 期望当order流数据更新或user流数据更新时,关联结果流数据都会得到更新。inner join不满足是因为两条流distinct
> >> id都很大,状态会很大,且不能TTL,因为user流更新时间不定,短的几小时,长达上月。
> >>
> >>
> >> 请问这种场景下要如何使用flink实现实时双流驱动join?
>