Re: 如何设置FlinkSQL并行度

2020-08-30 文章 赵一旦
啥情况,你是调整了sql部分实现嘛。有示例嘛。

zilong xiao  于2020年8月29日周六 下午5:19写道:

> SQL 算子并行度设置可以自己实现,可以私下交流下,正好在做这块,基本能工作了
>
> JasonLee <17610775...@163.com> 于2020年8月23日周日 下午2:07写道:
>
> > hi
> > checkpoint savepoint的问题可以看下这个
> > https://mp.weixin.qq.com/s/Vl6_GsGeG0dK84p9H2Ld0Q
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


Re: 关于flink任务的日志收集到kafka,可以在logback配置文件中,加如每个job的id或者name吗?

2020-08-30 文章 zilong xiao
可以用程序来完成的,flink-conf.yaml里可以先用占位符,例如 `env.java.opts:
-Djob.name={{job_name}}`  在你提交作业之前,先读到这个模板文件,在代码里去replace该占位符就好,不需要手动去改

Jim Chen  于2020年8月31日周一 下午1:33写道:

> 我也是flink1.10.1的版本的,如果按照你的方法,每次启动一个任务,都要在flink-conf.yaml中修改一下`env.java.opts:
> -Djob.name=xxx`吗?这样的话,是不是太麻烦了
>
> zilong xiao  于2020年8月31日周一 下午12:08写道:
>
> > 想问下你用的flink哪个版本呢?
> > 如果是Flink 1.10-版本,可以在shell脚本中加上 -yD
> > jobName=xxx,然后在logback自定义PatternLayout中用环境变量`_DYNAMIC_PROPERTIES`获取
> > 如果是Flink 1.10+版本,则上述方式不可行,因为1.10+版本在作业启动执行 launch_container.sh
> > <
> >
> http://dn-rt199.jja.bigo:8042/node/containerlogs/container_e19_1597907464753_1954_01_01/zengkejie/launch_container.sh/?start=-4096
> > >脚本时,脚本中不再`export
> >  _DYNAMIC_PROPERTIES`变量,所以无法从环境变量获取,那么可以在flink-conf.yaml中添加
> > `env.java.opts: -Djob.name=xxx`,然后在 PatternLayout中获取启动参数即可
> >
> > 以上是我个人的实现方式,目前可正常运行,如有描述不正确的地方,欢迎探讨~
> >
> > Jim Chen  于2020年8月31日周一 上午11:33写道:
> >
> > > 我现在是用shell脚本提交per job模式的任务,现在只能拿到yarn的applicationId,自定义的任务名,拿不到
> > >
> > >
> > > zilong xiao  于2020年8月27日周四 下午7:24写道:
> > >
> > > > 如果是用CLI方式提交作业的话是可以做到的
> > > >
> > > > Jim Chen  于2020年8月27日周四 下午6:13写道:
> > > >
> > > > > 如果是自动以PatternLayout的话,我有几点疑问:
> > > > >
> > > > >
> > > >
> > >
> >
> 1、logback加载时机的问题,就是①先会运行logback相关类,②再执行你自定义的PatternLayout,③再去执行你的主类,在②的时候,此时还没法确定具体的启动类是啥,这种方式没法根据job动态变化
> > > > >
> > > > > 如果使用env的话
> > > > > 1、配置环境变量的话,如果yarn有10个节点。那么每台是不是都要配置一下
> > > > > 2、因为是每个job都要传递,所以,这个应该是临时的环境变量吧
> > > > > 3、如果是配置的临时环境变量的话,那么在执行bin/flink run的时候,shell中是执行java
> > > > >
> > >
> -cp的,此时的主类,是org.apache.flink.client.cli.CliFrontend,这种方式,环境变量在传递的时候,会丢吧?
> > > > >
> > > > > zilong xiao  于2020年8月25日周二 下午5:32写道:
> > > > >
> > > > > >
> > > >
> > 1:想加入跟业务相关的字段,例如jobId,jobName,可以继承PatternLayout,重写doLayout即可,在方法中对日志进行填充
> > > > > > 2:这些属性有办法可以从环境变量中获取
> > > > > >
> > > > > > Jim Chen  于2020年8月25日周二 下午4:49写道:
> > > > > >
> > > > > > > 大家好:
> > > > > > >
> >  我们在做flink的日志收集到kafak时,使用的logback日志配置文件,目前的pattern是%d{-MM-dd
> > > > > > > HH:mm:ss.SSS} [%thread] %-5level %logger{60} -
> > > > > > >
> > > %msg,有没有什么办法在里面加入每个job的id,name或者tasknamanger的主机名之类的信息啊。在做ELK的时候,方便查询。
> > > > > > > 这个配置文件,是整个项目的,是基于Yarn的per
> > > job模式,难道每个主类打包的时候,都要改动不同的logbakc配置文件吗?
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: flink stream sink hive

2020-08-30 文章 Yun Gao
社区的邮件列表应该不支持图片,现在图看不到,要不直接把stack贴上来吧,或者用个图床。



 --Original Mail --
Sender:liya...@huimin100.cn 
Send Date:Thu Aug 27 19:09:51 2020
Recipients:user-zh 
Subject:flink stream sink hive

flink1.11.1 往hive2.1.1 的orc表写数据报的异常,在网上查不到,只能来这里了,麻烦大佬们帮我看看


liya...@huimin100.cn

Re: 关于flink任务的日志收集到kafka,可以在logback配置文件中,加如每个job的id或者name吗?

2020-08-30 文章 Jim Chen
我也是flink1.10.1的版本的,如果按照你的方法,每次启动一个任务,都要在flink-conf.yaml中修改一下`env.java.opts:
-Djob.name=xxx`吗?这样的话,是不是太麻烦了

zilong xiao  于2020年8月31日周一 下午12:08写道:

> 想问下你用的flink哪个版本呢?
> 如果是Flink 1.10-版本,可以在shell脚本中加上 -yD
> jobName=xxx,然后在logback自定义PatternLayout中用环境变量`_DYNAMIC_PROPERTIES`获取
> 如果是Flink 1.10+版本,则上述方式不可行,因为1.10+版本在作业启动执行 launch_container.sh
> <
> http://dn-rt199.jja.bigo:8042/node/containerlogs/container_e19_1597907464753_1954_01_01/zengkejie/launch_container.sh/?start=-4096
> >脚本时,脚本中不再`export
>  _DYNAMIC_PROPERTIES`变量,所以无法从环境变量获取,那么可以在flink-conf.yaml中添加
> `env.java.opts: -Djob.name=xxx`,然后在 PatternLayout中获取启动参数即可
>
> 以上是我个人的实现方式,目前可正常运行,如有描述不正确的地方,欢迎探讨~
>
> Jim Chen  于2020年8月31日周一 上午11:33写道:
>
> > 我现在是用shell脚本提交per job模式的任务,现在只能拿到yarn的applicationId,自定义的任务名,拿不到
> >
> >
> > zilong xiao  于2020年8月27日周四 下午7:24写道:
> >
> > > 如果是用CLI方式提交作业的话是可以做到的
> > >
> > > Jim Chen  于2020年8月27日周四 下午6:13写道:
> > >
> > > > 如果是自动以PatternLayout的话,我有几点疑问:
> > > >
> > > >
> > >
> >
> 1、logback加载时机的问题,就是①先会运行logback相关类,②再执行你自定义的PatternLayout,③再去执行你的主类,在②的时候,此时还没法确定具体的启动类是啥,这种方式没法根据job动态变化
> > > >
> > > > 如果使用env的话
> > > > 1、配置环境变量的话,如果yarn有10个节点。那么每台是不是都要配置一下
> > > > 2、因为是每个job都要传递,所以,这个应该是临时的环境变量吧
> > > > 3、如果是配置的临时环境变量的话,那么在执行bin/flink run的时候,shell中是执行java
> > > >
> > -cp的,此时的主类,是org.apache.flink.client.cli.CliFrontend,这种方式,环境变量在传递的时候,会丢吧?
> > > >
> > > > zilong xiao  于2020年8月25日周二 下午5:32写道:
> > > >
> > > > >
> > >
> 1:想加入跟业务相关的字段,例如jobId,jobName,可以继承PatternLayout,重写doLayout即可,在方法中对日志进行填充
> > > > > 2:这些属性有办法可以从环境变量中获取
> > > > >
> > > > > Jim Chen  于2020年8月25日周二 下午4:49写道:
> > > > >
> > > > > > 大家好:
> > > > > >
>  我们在做flink的日志收集到kafak时,使用的logback日志配置文件,目前的pattern是%d{-MM-dd
> > > > > > HH:mm:ss.SSS} [%thread] %-5level %logger{60} -
> > > > > >
> > %msg,有没有什么办法在里面加入每个job的id,name或者tasknamanger的主机名之类的信息啊。在做ELK的时候,方便查询。
> > > > > > 这个配置文件,是整个项目的,是基于Yarn的per
> > job模式,难道每个主类打包的时候,都要改动不同的logbakc配置文件吗?
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: 关于flink任务的日志收集到kafka,可以在logback配置文件中,加如每个job的id或者name吗?

2020-08-30 文章 zilong xiao
想问下你用的flink哪个版本呢?
如果是Flink 1.10-版本,可以在shell脚本中加上 -yD
jobName=xxx,然后在logback自定义PatternLayout中用环境变量`_DYNAMIC_PROPERTIES`获取
如果是Flink 1.10+版本,则上述方式不可行,因为1.10+版本在作业启动执行 launch_container.sh
脚本时,脚本中不再`export
 _DYNAMIC_PROPERTIES`变量,所以无法从环境变量获取,那么可以在flink-conf.yaml中添加
`env.java.opts: -Djob.name=xxx`,然后在 PatternLayout中获取启动参数即可

以上是我个人的实现方式,目前可正常运行,如有描述不正确的地方,欢迎探讨~

Jim Chen  于2020年8月31日周一 上午11:33写道:

> 我现在是用shell脚本提交per job模式的任务,现在只能拿到yarn的applicationId,自定义的任务名,拿不到
>
>
> zilong xiao  于2020年8月27日周四 下午7:24写道:
>
> > 如果是用CLI方式提交作业的话是可以做到的
> >
> > Jim Chen  于2020年8月27日周四 下午6:13写道:
> >
> > > 如果是自动以PatternLayout的话,我有几点疑问:
> > >
> > >
> >
> 1、logback加载时机的问题,就是①先会运行logback相关类,②再执行你自定义的PatternLayout,③再去执行你的主类,在②的时候,此时还没法确定具体的启动类是啥,这种方式没法根据job动态变化
> > >
> > > 如果使用env的话
> > > 1、配置环境变量的话,如果yarn有10个节点。那么每台是不是都要配置一下
> > > 2、因为是每个job都要传递,所以,这个应该是临时的环境变量吧
> > > 3、如果是配置的临时环境变量的话,那么在执行bin/flink run的时候,shell中是执行java
> > >
> -cp的,此时的主类,是org.apache.flink.client.cli.CliFrontend,这种方式,环境变量在传递的时候,会丢吧?
> > >
> > > zilong xiao  于2020年8月25日周二 下午5:32写道:
> > >
> > > >
> > 1:想加入跟业务相关的字段,例如jobId,jobName,可以继承PatternLayout,重写doLayout即可,在方法中对日志进行填充
> > > > 2:这些属性有办法可以从环境变量中获取
> > > >
> > > > Jim Chen  于2020年8月25日周二 下午4:49写道:
> > > >
> > > > > 大家好:
> > > > > 我们在做flink的日志收集到kafak时,使用的logback日志配置文件,目前的pattern是%d{-MM-dd
> > > > > HH:mm:ss.SSS} [%thread] %-5level %logger{60} -
> > > > >
> %msg,有没有什么办法在里面加入每个job的id,name或者tasknamanger的主机名之类的信息啊。在做ELK的时候,方便查询。
> > > > > 这个配置文件,是整个项目的,是基于Yarn的per
> job模式,难道每个主类打包的时候,都要改动不同的logbakc配置文件吗?
> > > > >
> > > >
> > >
> >
>


回复: flink1.11 流式读取hive怎么设置 process_time 和event_time?

2020-08-30 文章 sllence
Hi Zou Dan:

可以尝试下立刻语句是否可行
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table

CREATE TABLE Orders (
user BIGINT,
product STRING,
order_time TIMESTAMP(3)
) WITH ( 
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE Orders_with_watermark (
-- Add watermark definition
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
-- Overwrite the startup-mode
'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;

-邮件原件-
发件人: me  
发送时间: 2020年8月30日 22:16
收件人: user-zh 
抄送: zoudanx 
主题: Re: flink1.11 流式读取hive怎么设置 process_time 和event_time? 

如果是直接连接的hive catalog呢,是hive中已存在的表,直接去流式的连接读取?
您那有什么可解决的想法吗?


 原始邮件 
发件人: Zou Dan
收件人: user-zh
发送时间: 2020年8月30日(周日) 21:55
主题: Re: flink1.11 流式读取hive怎么设置 process_time 和event_time?


Event time 是通过 DDL 中 watermark 语句设置的,具体可以参考文档 [1] [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
 
<">https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table>
 Best, Dan Zou > 2020年8月30日 下午9:42,me  写道: > > flink1.11 
可以使用在使用select语句时,显式的指定是流式读取,流式的读出出来之后如果想使用实时计算中的特性窗口函数然后指定时间语义 
事件时间和处理时间,但是flink sql需要显示的定义数据中的时间字段才能识别为 event_time,求问这个怎么去设置。


Re: 关于flink任务的日志收集到kafka,可以在logback配置文件中,加如每个job的id或者name吗?

2020-08-30 文章 Jim Chen
我现在是用shell脚本提交per job模式的任务,现在只能拿到yarn的applicationId,自定义的任务名,拿不到


zilong xiao  于2020年8月27日周四 下午7:24写道:

> 如果是用CLI方式提交作业的话是可以做到的
>
> Jim Chen  于2020年8月27日周四 下午6:13写道:
>
> > 如果是自动以PatternLayout的话,我有几点疑问:
> >
> >
> 1、logback加载时机的问题,就是①先会运行logback相关类,②再执行你自定义的PatternLayout,③再去执行你的主类,在②的时候,此时还没法确定具体的启动类是啥,这种方式没法根据job动态变化
> >
> > 如果使用env的话
> > 1、配置环境变量的话,如果yarn有10个节点。那么每台是不是都要配置一下
> > 2、因为是每个job都要传递,所以,这个应该是临时的环境变量吧
> > 3、如果是配置的临时环境变量的话,那么在执行bin/flink run的时候,shell中是执行java
> > -cp的,此时的主类,是org.apache.flink.client.cli.CliFrontend,这种方式,环境变量在传递的时候,会丢吧?
> >
> > zilong xiao  于2020年8月25日周二 下午5:32写道:
> >
> > >
> 1:想加入跟业务相关的字段,例如jobId,jobName,可以继承PatternLayout,重写doLayout即可,在方法中对日志进行填充
> > > 2:这些属性有办法可以从环境变量中获取
> > >
> > > Jim Chen  于2020年8月25日周二 下午4:49写道:
> > >
> > > > 大家好:
> > > > 我们在做flink的日志收集到kafak时,使用的logback日志配置文件,目前的pattern是%d{-MM-dd
> > > > HH:mm:ss.SSS} [%thread] %-5level %logger{60} -
> > > > %msg,有没有什么办法在里面加入每个job的id,name或者tasknamanger的主机名之类的信息啊。在做ELK的时候,方便查询。
> > > > 这个配置文件,是整个项目的,是基于Yarn的per job模式,难道每个主类打包的时候,都要改动不同的logbakc配置文件吗?
> > > >
> > >
> >
>


Re: flink1.11连接mysql问题

2020-08-30 文章 Danny Chan
这个问题已经有 issue 在追踪了 [1]

[1] https://issues.apache.org/jira/browse/FLINK-12494

Best,
Danny Chan
在 2020年8月28日 +0800 PM3:02,user-zh@flink.apache.org,写道:
>
> CommunicationsException


Re: flink1.11时间函数

2020-08-30 文章 Danny Chan
对应英文的 deterministic function 可以更好理解些 ~

Best,
Danny Chan
在 2020年8月29日 +0800 PM6:23,Dream-底限 ,写道:
> 哦哦,好吧,我昨天用NOW的时候直接报错告诉我这是个bug,让我提交issue,我以为这种标示的都是函数功能有问题的
>
> Benchao Li  于2020年8月28日周五 下午8:01写道:
>
> > 不确定的意思是,这个函数的返回值是动态的,每次调用返回可能不同。
> > 对应的是确定性函数,比如concat就是确定性函数,只要输入是一样的,它的返回值就永远都是一样的。
> > 这个函数是否是确定性的,会影响plan的过程,比如是否可以做express reduce,是否可以复用表达式结果等。
> >
> > Dream-底限  于2020年8月28日周五 下午2:50写道:
> >
> > > hi
> > >
> > > UNIX_TIMESTAMP()
> > >
> > > NOW()
> > >
> > > 我这面想使用flink的时间戳函数,但是看官方文档对这两个函数描述后面加了一个此功能不确定,这个此功能不确定指的是这两个时间函数不能用吗
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-30 文章 Danny Chan
能否提供下完整的 query,方便追踪和排查 ~

Best,
Danny Chan
在 2020年8月31日 +0800 AM10:58,zhuyuping <1050316...@qq.com>,写道:
> 同样出现了这个问题,SQL 使用中,请问是什么原因,翻转tumble窗口当使用mapview 进行操作时候,状态不断的增长
> 好像不能清理一样,因为正常的window 窗口 窗口结束后会清理状态,现在的情况是1秒的翻转tumble窗口,满满的从最开始的1m 过一个小时变成了1g
> 不断的无限增长下去
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-sql-gateway还会更新吗

2020-08-30 文章 shougou
感谢各位的辛苦付出,今天就准备试一下。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-30 文章 zhuyuping
同样出现了这个问题,SQL 使用中,请问是什么原因,翻转tumble窗口当使用mapview 进行操作时候,状态不断的增长
好像不能清理一样,因为正常的window 窗口 窗口结束后会清理状态,现在的情况是1秒的翻转tumble窗口,满满的从最开始的1m 过一个小时变成了1g
不断的无限增长下去



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-30 文章 zhuyuping
我这边出现同样的问题,我换成了filesystem 发现state 还是一样缓慢增大,所以应该跟rocksdb 无关



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink1.10 sql state 聚合函数 窗口统计,状态越来越大,无法自动清理问题

2020-08-30 文章 zhuyuping
http://apache-flink.147419.n8.nabble.com/flink1-10-1-1-11-1-sql-group-td5491.html


跟这个问题类似 

Filesystem rocksdb  都试过,rowtime proctime 窗口统计都试过。


CREATE VIEW cpd_expo_feature_collect_view as select
imei,incrmentFeatureCollect(CAST(serverTime AS INT),adId) as feature from
dwd_oth_ads_appstore_exposure_process_view
  group by TUMBLE (proctime, INTERVAL '10' SECOND),imei;



--
Sent from: http://apache-flink.147419.n8.nabble.com/


来自邮件帮助中心的邮件

2020-08-30 文章 邮件帮助中心



flink1.10 sql state 聚合函数 窗口统计,状态越来越大,无法自动清理问题

2020-08-30 文章 zhuyuping

   

现在flink 使用
如下sql:
我创建了一个聚合函数,就是mapview 简单的put ,然后返回string

然后我使用翻转窗口10s ,1s,1分钟进行统计,
但是出现了每隔3个 ,状态就会增大。最后状态会越来越大,导致checkpoint失败,任务重启,
刚开始以为是反压。最后我使用insert into discardSink ,也是出现同样的问题
sql:
CREATE VIEW cpd_xx_view as select
imei,incrmentFeatureCollect(CAST(serverTime AS INT),adId) as feature from
xxx_view
  group by TUMBLE (proctime, INTERVAL '10' SECOND),imei;  






--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-30 文章 zhuyuping

 

 

我也出现了这个问题, 我使用的是窗口函数进行group by 

发现state 不会清空,还是10m 到后面 几G 缓慢增长,大概每3个checkpoint 增长
任务没有反压。为了测试我使用discardSink 

先后 换了 1 second 1分钟,还有proctime rowtime模式 来窗口统计都一样 ,state缓慢增大

CREATE VIEW cpd_expo_feature_collect_view as select
imei,incrmentFeatureCollect(CAST(serverTime AS INT),adId) as feature from
dwd_oth_ads_appstore_exposure_process_view
  group by TUMBLE (proctime, INTERVAL '10' SECOND),imei;



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11 流式读取hive怎么设置 process_time 和event_time?

2020-08-30 文章 Rui Li
Hi,

这个场景目前还是不支持的。定义watermark需要在DDL里做,hive表本身没有这个概念,所以DDL里定义不了。以后也许可以通过额外的参数来指定watermark。

On Sun, Aug 30, 2020 at 10:16 PM me  wrote:

> 如果是直接连接的hive catalog呢,是hive中已存在的表,直接去流式的连接读取?
> 您那有什么可解决的想法吗?
>
>
>  原始邮件
> 发件人: Zou Dan
> 收件人: user-zh
> 发送时间: 2020年8月30日(周日) 21:55
> 主题: Re: flink1.11 流式读取hive怎么设置 process_time 和event_time?
>
>
> Event time 是通过 DDL 中 watermark 语句设置的,具体可以参考文档 [1] [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> <">
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table>
> Best, Dan Zou > 2020年8月30日 下午9:42,me  写道: > > flink1.11
> 可以使用在使用select语句时,显式的指定是流式读取,流式的读出出来之后如果想使用实时计算中的特性窗口函数然后指定时间语义
> 事件时间和处理时间,但是flink sql需要显示的定义数据中的时间字段才能识别为 event_time,求问这个怎么去设置。



-- 
Best regards!
Rui Li


Re: flink-sql-gateway还会更新吗

2020-08-30 文章 godfrey he
已更新至flink1.11.1

godfrey he  于2020年8月24日周一 下午9:45写道:

> 我们会在这周让flink-sql-gateway支持1.11,请关注
> 另外,sql-client支持gateway模式,据我所知目前还没计划。
>
> shougou <80562...@qq.com> 于2020年8月24日周一 上午9:48写道:
>
>> 也有同样的问题,同时也问一下,sql client 计划在哪个版本支持gateway模式?多谢
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>


Re: flink1.11 流式读取hive怎么设置 process_time 和event_time?

2020-08-30 文章 me
如果是直接连接的hive catalog呢,是hive中已存在的表,直接去流式的连接读取?
您那有什么可解决的想法吗?


 原始邮件 
发件人: Zou Dan
收件人: user-zh
发送时间: 2020年8月30日(周日) 21:55
主题: Re: flink1.11 流式读取hive怎么设置 process_time 和event_time?


Event time 是通过 DDL 中 watermark 语句设置的,具体可以参考文档 [1] [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
 
<">https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table>
 Best, Dan Zou > 2020年8月30日 下午9:42,me  写道: > > flink1.11 
可以使用在使用select语句时,显式的指定是流式读取,流式的读出出来之后如果想使用实时计算中的特性窗口函数然后指定时间语义 
事件时间和处理时间,但是flink sql需要显示的定义数据中的时间字段才能识别为 event_time,求问这个怎么去设置。

Re: flink1.11 流式读取hive怎么设置 process_time 和event_time?

2020-08-30 文章 Zou Dan
Event time 是通过 DDL 中 watermark 语句设置的,具体可以参考文档 [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
 


Best,
Dan Zou

> 2020年8月30日 下午9:42,me  写道:
> 
> flink1.11 可以使用在使用select语句时,显式的指定是流式读取,流式的读出出来之后如果想使用实时计算中的特性窗口函数然后指定时间语义  
> 事件时间和处理时间,但是flink sql需要显示的定义数据中的时间字段才能识别为 event_time,求问这个怎么去设置。



flink1.11 流式读取hive怎么设置 process_time 和event_time?

2020-08-30 文章 me
flink1.11 可以使用在使用select语句时,显式的指定是流式读取,流式的读出出来之后如果想使用实时计算中的特性窗口函数然后指定时间语义  
事件时间和处理时间,但是flink sql需要显示的定义数据中的时间字段才能识别为 event_time,求问这个怎么去设置。

Re?? ??????????????????UV??????????????MapState??BloomFilter,??checkpoint????????????????????

2020-08-30 文章 Yichao Yang
Hi,


??Longuv?? RoaringBitMap[1]


[1] https://mp.weixin.qq.com/s/jV0XmFxXFnzbg7kcKiiDbA


Best,
Yichao Yang




--  --
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
Best,
Congxian


x <35907...@qq.com> ??2020??8??27?? 1:48??

>
> 
UV??MapStateBloomFilter??,checkpoint??bloomMapState

Re:启动任务异常, Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1

2020-08-30 文章 RS
测试出来了, rowtime参数需要是最后一个参数, $(timeField).rowtime()

但是这个报错也太隐晦了吧 .





在 2020-08-30 14:54:15,"RS"  写道:
>Hi, 请教下
>
>
>启动任务的时候抛异常了, 但是没看懂报错原因, 麻烦各位大佬帮看下
>这里我是先创建了一个DataStreamSource, 然后配置转为view, 配置EventTime, 后面再用SQL DDL进行数据处理
>DataStreamSource source = env.addSource(consumer);
>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>tableEnv.createTemporaryView(table_name, source, 
> $(timeField).rowtime(), $("cpu"));
>tableEnv.from(table_name).window(
>Tumble.over(lit(1).minutes())
>.on($(timeField))
>.as(table_name + "Window")
>);
>tableEnv.executeSql(sql1);  // CREATE TABLE t_out (`ts` TIMESTAMP(3), 
> `count` BIGINT) WITH ('connector' = 'print')   没有报错
>tableEnv.executeSql(sql2);  //  INSERT INTO t_out SELECT 
> TUMBLE_START(`ts`, INTERVAL '1' MINUTE), COUNT(1) as `count` FROM t1 GROUP BY 
> TUMBLE(`ts`, INTERVAL '1' MINUTE)  抛异常
>
>
>异常堆栈:
>org.apache.flink.client.program.ProgramInvocationException: The main method 
>caused an error: Index: 1, Size: 1
>
>at 
>org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>
>at 
>org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>
>at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>
>at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>
>at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>
>at 
>org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>
>at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>
>at java.security.AccessController.doPrivileged(Native Method)
>
>at javax.security.auth.Subject.doAs(Subject.java:422)
>
>at 
>org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
>
>at 
>org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
>at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>
>Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
>
>at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>
>at java.util.ArrayList.get(ArrayList.java:433)
>
>at java.util.Collections$UnmodifiableList.get(Collections.java:1311)
>
>at 
>org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:682)
>
>at 
>org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:665)
>
>at 
>org.apache.flink.table.planner.codegen.GenerateUtils$.generateInputAccess(GenerateUtils.scala:561)
>
>at 
>org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateConverterResultExpression$1(ExprCodeGenerator.scala:184)
>
>at 
>org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateConverterResultExpression$1$adapted(ExprCodeGenerator.scala:158)
>
>at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>
>at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
>
>at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
>
>at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:242)
>
>at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>
>at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>
>at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
>
>at 
>org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateConverterResultExpression(ExprCodeGenerator.scala:158)
>
>at 
>org.apache.flink.table.planner.plan.utils.ScanUtil$.convertToInternalRow(ScanUtil.scala:103)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlanInternal(StreamExecDataStreamScan.scala:126)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlanInternal(StreamExecDataStreamScan.scala:55)
>
>at 
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>
>at 
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlan(StreamExecDataStreamScan.scala:55)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>
>at 
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>
>at 
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>
>at 
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>
>at 
>org.apache.flink.table.planner.p