Re: Upsert kafka 作为 source 的几个问题

2021-05-10 文章 macdoor
在做join时 ksql 会强制检查两个表的key是否相同,如果不同则报错,感觉这是一个比较好的方法。
你说 “目前 Upsert-kafka 要求具有相同key的数据在相同 partition 的。因为 kafka 仅保证 partiiton 内按
offset 读取,如果相同 key 的数据分布在不同 partition 的话,那么读取会乱序。”
flink 中两个表不使用相同的key 也可以成功 join ,但数据会出现错误,这样的话,在编译sql时报错应该更好



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Fw:flink on native kubernetes要如何修改Logging配置?

2021-05-10 文章 Yang Wang
你只需要修改本地Flink conf目录下面的log4j-console.properties就可以了

这个配置文件会通过ConfigMap自动ship到JM和TM上并使用

Best,
Yang

casel.chen  于2021年5月8日周六 下午11:57写道:

>
> 求大佬解答一下,谢谢!
>
>
>
>
>
>  转发邮件信息 
> 发件人:"casel.chen" 
> 发送日期:2021-05-08 11:50:03
> 收件人:"user-zh@flink.apache.org" 
> 主题:flink on native kubernetes要如何修改Logging配置?
> 我用native kubernetes方式部署flink session
> cluster,想修改某个包下的日志级别,如果直接修改configmap下的log4j-console.properties再重新部署是能生效的,但是通过命令行
> (./bin/kubernetes-session.sh -Dkubernetes.cluster-id=xxx) 起flink session
> cluster会将之前的修改冲掉,有什么办法可以保留下之前的修改吗?是否有命令行启动参数可以指定自定义的logging配置?谢谢!
>
>
>
>
>
>
>


Re: About the windowOperator and Watermark

2021-05-10 文章 Dawid Wysakowicz
Hi,

When a Watermark arrives the window operator will emit all windows that
are considered finished at the time of the Watermark. In your example
(assuming both windows are finished) they will both be emitted.

Best,

Dawid

On 08/05/2021 08:03, 曲洋 wrote:
> Hi Experts,
>
> Given that a window in the stream is configured with short window size like 
> timeWinodw(3s),
> and I gotta utilize Event time and Periodic Watermark.
> The stream input is [watermark(7) | 6, 5, 3, 4, 1, 2],
> and then two windows are created (3,1,2) (6,5,4) before watermark(7) arriving.
> But in this situation when the current watermark is received,
> which window or how many windows will be be triggered to fire and emit?
> My question is what will the windowOperater do when it comes to two parellel 
> windows edge end timestamps both smaller than cerrent watermark timestamps?



OpenPGP_signature
Description: OpenPGP digital signature


回复:FlinkCEP Pattern匹配的问题

2021-05-10 文章 张义藤
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/libs/cep.html#%E5%8C%B9%E9%85%8D%E5%90%8E%E8%B7%B3%E8%BF%87%E7%AD%96%E7%95%A5


官方文档中 这个API 或许可以解决你的问题


| |
张义藤
|
|
inte...@163.com
|
签名由网易邮箱大师定制
在2021年5月10日 18:48,lp<973182...@qq.com> 写道:
我有一个flinkCEP的程序,检测nginx日志,假如同一ip,60s内超过3次访问,则报警。
我访问了7次,代号为1~7
检测到了4组报警分别是
[/1, /2, /3]
[/2, /3, /4]
[/3, /4, /5]
[/4, /5, /6]

请问下,如果想之前已经参与过匹配的数据不再参与匹配,应该怎样做,比如其实我想得到两组报警:
[/1, /2, /3]
[/4, /5, /6]


如下是我检测的关键代码:
Pattern pattern =
Pattern.begin("start").times(3).within(Time.seconds(60));




--
Sent from: http://apache-flink.147419.n8.nabble.com/


FlinkCEP Pattern匹配的问题

2021-05-10 文章 lp
我有一个flinkCEP的程序,检测nginx日志,假如同一ip,60s内超过3次访问,则报警。
我访问了7次,代号为1~7
检测到了4组报警分别是
[/1, /2, /3]
[/2, /3, /4]
[/3, /4, /5]
[/4, /5, /6]

请问下,如果想之前已经参与过匹配的数据不再参与匹配,应该怎样做,比如其实我想得到两组报警:
[/1, /2, /3]
[/4, /5, /6]


如下是我检测的关键代码:
Pattern pattern =
Pattern.begin("start").times(3).within(Time.seconds(60));




--
Sent from: http://apache-flink.147419.n8.nabble.com/


prometheus metric中如何设置label

2021-05-10 文章 suisuimu
基于RichMapFunction自定义Prometheus metric时增加label
```
counter = this.metricGroup
.addGroup("app", value.getAppName())
.addGroup("exp", value.getExceptionName())
.counter("myExpCounter");
```
通过add group可以在metric中看到label信息
flink_taskmanager_job_task_operator_app_exp_myExpCounter{app="",endpoint="pushgateway",exp="java_io_IOException",flink_namespace="xxx",host="11_7_9_11",job="xxx",job_id="xxx",job_name="custom_log",namespace="monitoring",operator_id="cf155f65686cb012844f7c745ec70a3c",operator_name="Map",pod="pushgateway-c7648cd5c-tvfb9",service="pushgateway",subtask_index="0",task_attempt_id="7d6fd088c0628eb564753939978086eb",task_attempt_num="0",task_id="cbc357ccb763df2852fee8c4fc7d55f2",task_name="Source:_Custom_SourceMapProcessMapSink:_Print_to_Std__Out",tm_id="10_7_9_71:6122_96edca"}

想问下:
1、是否是这种方式增加label
2、由于采用了add group的方式,导致exp对应的值里面的 ‘.’ 变成了下划线,是否有办法保持为'.'



--
Sent from: http://apache-flink.147419.n8.nabble.com/


yarn-cluster模式下flink的类加载顺序问题

2021-05-10 文章 Willson
*遇到的问题*
根据flink写入hive的最佳实践(https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/hive/overview/#dependencies),
我们添加了flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar到集群的lib目录下.
该jar包中使用了3.1版本的common-lang3
我们想要使用3.3.2版本的common-lang3, 于是手动打包了logformat.jar,
但发现任务执行时还是调用了3.1版本的common-lang3

*我们的处理方式*
在官网上看到yarn-cluster模式下,
不会使用动态类加载(https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#overview-of-classloading-in-flink).
 
我们的理解是, 不使用动态类加载, 也就不会特意优先加载用户的jar, 此时会根据jar包的名字从a到z依次加载. 
于是我们把logformat.jar改名为alogformat.jar, 以保证用户jar包的名字在排序时有更高的优先级.
然后再执行任务时, 正常调用了3.3.2版本的common-lang3

*问题总结*
1. yarn-cluster模式下确实是根据jar包的名字从a到z依次加载的吗?
2. 在yarn-cluster模式下, 有没有办法不更改用户jar包的名字, 但是优先加载用户jar包

*组件版本*
flink: 1.12.0
hive: 2.1.0



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink 流批一体的实践

2021-05-10 文章 Jacob
Deal All,

*【业务场景】*

step1:Flink消费kafka,将消息进行加工处理,每半个小时要生成一次增量数据,然后将这半个小时的数据写到HDFS的一个orc文件
step2:将上一步flink
Job写到hdfs的文件load到hive表,然后和全量数据(hive表)进行Join,将Join后的结果根据某个字段进行group
by,再写入到几张结构相同的hive表。执行完后将增量文件删除

*【存在的问题】*

1. 上面的两步分别要维护两套代码,开发和维护都比较麻烦

2.
第二步依赖于hadoop做MapReduce,增量和全量Join速度较慢,且资源占用较大,而且当第二步运行时,如果没有资源,将无法在半个小时内生成Join结果,那么就没有及时删除增量文件,导致第一步Flink
job会连续生成多个增量文件,等到Join的MapReduce
job有资源后,需要将多个增量文件merge,再与全量Join,这样以来整个过程又被拖慢。
3. 时效性不好,本身是一个整体,被拆成两步,存在job累加,得等上一步flink job
hdfs写入完成了,下一个定时Job才能正常运行,让原有的实时性效果变差

【请教】

能否把上面两步合二为一?该场景应该也算是流批同时存在的业务,Flink 的“流批一体”,能否实现?

由于第一步中的Flink Job的执行环境是StreamExecutionEnvironment ,如果用Flink
SQL去运行增量和全量的Join,那就是另外的TableEnvironment了,我不太清楚怎么在同一个Job去构建这两种环境,以及,Join后要写入几张不同的hive表,必然会运行多个sql语句,我看到的一些demo
都是一次跑一个sql语句,那么这种需求是可以实现的吗?



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/