Re: 回复: flinksql ttl不生效

2021-06-14 Thread wenlong.lwl
保留一段时间,就不是temporal join的语义了,你可以试试用interval join再做个去重,但是去重也有state的开销。

On Fri, 11 Jun 2021 at 10:44, chenchencc <1353637...@qq.com> wrote:

> 嗯嗯,我这边排查看到是我是用temporary left join
>
> 维度表,使用事件时间,但是我期望维度表只保留3小时。目前使用on加上时间范围,貌似不生效,导致join的状态不断增加。有什么方式能处理吗,保留维度表一段时间数据。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.13.1 org.apache.flink.table.catalog.Column 类为什么没有实现Serializable接口

2021-06-15 Thread wenlong.lwl
不能直接支持serializable,里面包含的field的类型太多了,不大可能全部支持serializable,序列化可以参考:
CatalogPropertiesUtil

On Tue, 15 Jun 2021 at 11:45, Asahi Lee <978466...@qq.com> wrote:

> hi!
>      org.apache.flink.table.catalog.Column
> 没有Serializable实现,是否可以支持Serializable?


Re: Flink 状态使用问题咨询

2019-04-16 Thread wenlong.lwl
可以封装一下state 的访问,从state get不到数据的时候,去数据库里取下,更新到state里

On Tue, 16 Apr 2019 at 20:53, zhang yue  wrote:

> 是的,我希望从mysql加载初始的状态,因为我的kafka消息是从某个时间点开始的,在这个时间点之前的数据需要先加载到flink state
> 那现在对于这种场景有什么替代方案吗
>
> > 在 2019年4月16日,下午8:33,Congxian Qiu  写道:
> >
> > Hi
> > 如果你希望程序在刚开始运行的时候从外部存储加载数据,这个暂时做不到,不过现在社区正在做 Savepoint Reader/Writer
> 相关的事情,到时候就可以了
> >
> > Best, Congxian
> > On Apr 16, 2019, 20:27 +0800, zhang yue , wrote:
> >> 你好,我有一个keyed
> state,当我启动flink程序时,我需要根据数据库中的数据给每个key加载不同的数值状态,我应该怎样来操作呢,在文档上找不到例子,我觉得应该是在open函数中设置初始状态,但是在这里获取不到key。
> >>
> >>
> >> class LineItemStat extends RichFlatMapFunction {
> >>
> >> /**
> >> * The ValueState handle. The first field is the count, the second field
> a running sum.
> >> */
> >> private transient MapState stat_value;
> >>
> >> @Override
> >> public void flatMap(ObjectNode input, Collector out) throws
> Exception {
> >>
> >> // access the state value
> >>
> >> }
> >>
> >> @Override
> >> public void open(Configuration config) {
> >> MapStateDescriptor descriptor =
> >> new MapStateDescriptor(
> >> "stat_value",String.class, Long.class); // default value of the state,
> if nothing was set
> >> stat_value = getRuntimeContext().getMapState(descriptor);
> >> }
> >> }
> >>
>
>


Re: Flink 状态使用问题咨询

2019-04-24 Thread wenlong.lwl
访问db获取初始state,是要hack下的,要自己保存一个KeySelector,算下当前记录的Key是什么,这个逻辑不用改flink,写到你业务代码里就好了。

On Wed, 24 Apr 2019 at 21:23, Shi Quan  wrote:

> 主要是考虑是在从异常恢复场景下,业务state是否需要重新加载。如果不需要重新加载,就不要记录这么多时间用来判断了。
>
>
>
> Sent from Mail for
> Windows 10
>
>
>
> 
> From: zhang yue 
> Sent: Wednesday, April 24, 2019 8:29:07 PM
> To: user-zh@flink.apache.org
> Subject: Re: Flink 状态使用问题咨询
>
> 嗯,明白你的意思,initTime < openTime是做何考虑,什么情况下initTime < openTime会满足
>
> > 在 2019年4月24日,下午8:16,Shi Quan  写道:
> >
> > initTime < openTime
>
>


Re: Flink-1.9.1

2019-11-06 Thread wenlong.lwl
想输出指定格式可以试试用DATE_FORMAT函数做一下格式化,转成string

On Wed, 6 Nov 2019 at 19:36, somnus...@163.com  wrote:

> 针对于使用 SQL查询,select current_timestamp,creaetTime from sourceTable ;其中
> createTime 注册成 SQL_TIMESTMAP 类型
> 问题描述:
> 无论是流中的原有数据 createTime,还是 current_time 函数
> 打印的结果都出现格式的问题
> 2019-11-06T11:01:24.047,2019-11-06T18:53:54
> 请问:
> 如何避免格式中 T 的出现
>
> 针对于上封邮件进行补充:
> 做了类型检测:flink 1.9.1 对于执行完SQL之后的类型,date 使用 java.time.LocalDate;timestamp 使用
> java.time.LocalDateTime
> 而1.7.2版本是 date 使用 java.sql.date;timestamp 使用 java.sql.timestamp
> 请问:
> 是出于什么考虑改变这个策略的?
>
>
>
>
>
>


Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

2019-11-06 Thread wenlong.lwl
可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。

On Thu, 7 Nov 2019 at 09:22, Chennet Steven  wrote:

> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
> 如何在聚合函数中使用State?
>
>
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
> TypeInformation}
> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
> import org.apache.flink.api.java.typeutils.TupleTypeInfo
> import org.apache.flink.table.functions.{AggregateFunction,
> FunctionContext}
> import java.lang.{Iterable => JIterable}
>
>
> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>
> class IntDiffSumFunction extends AggregateFunction[Int,
> IntDiffSumAccumulator] {
>
>   override def open(context: FunctionContext): Unit = {
> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
> //getRuntimeContext.getState(desc)
> val a = this.hashCode()
> print(s"hashCode:$a")
> super.open(context)
>   }
>
>   override def createAccumulator(): IntDiffSumAccumulator = {
> val acc = new IntDiffSumAccumulator()
> acc.f0 = 0
> acc.f1 = false
> acc
>   }
>
>   def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
> accumulator.f0 += value
> accumulator.f1 = true
>   }
>
>   override def getValue(accumulator: IntDiffSumAccumulator): Int = {
> if (accumulator.f1) {
>
>   accumulator.f0
> } else {
>   Int.MinValue
> }
>   }
>
>   def merge(acc: IntDiffSumAccumulator, its:
> JIterable[IntDiffSumAccumulator]) = {
> val iter = its.iterator()
> while (true) {
>   val a = iter.next()
>   if (a.f1) {
> acc.f0 += a.f0
> acc.f1 = true
>   }
> }
>   }
>
>   def resetAccumulator(acc: IntDiffSumAccumulator) = {
> acc.f0 = 0
> acc.f1 = false
>   }
>
>   override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
> new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.BOOLEAN_TYPE_INFO)
> }
>
>
> From stevenchen
>  webchat 38798579
>
>
>