请教flink cep如何对无序数据处理

2021-05-13 Thread sherlock zw
兄弟们,想问下Flink的CEP能够对无序数据流进行处理匹配嘛?
我这里指的无序事件是:例如有两个事件,事件A和事件B,在一个时间窗口内,只要匹配到了A和B,不论A和B的到来顺序,我都认为是符合我的条件


回复: 请教flink cep如何对无序数据处理

2021-05-15 Thread sherlock zw
老哥,这个图片好像我这边好像显示不出来。能否再重新发一次


发件人: Peihui He<mailto:peihu...@gmail.com>
发送时间: 2021年5月14日 19:55
收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>
主题: Re: 请教flink cep如何对无序数据处理

[cid:ii_koo9jn4u1]

这样可以不?

sherlock zw mailto:zw30...@live.com>> 于2021年5月14日周五 上午8:52写道:
兄弟们,想问下Flink的CEP能够对无序数据流进行处理匹配嘛?
我这里指的无序事件是:例如有两个事件,事件A和事件B,在一个时间窗口内,只要匹配到了A和B,不论A和B的到来顺序,我都认为是符合我的条件



关于CEP处理事件的问题

2021-06-10 Thread sherlock zw
大佬们,请教一下,我现在使用CEP时遇到一个问题,我现在的场景是需要输入三次相同字符串打印一次匹配的List集合,但是遇到的问题是每次都需要输入第四条数据才会触发Pattern的select函数去打印List。
具体实现代码如下:
public class Run3 {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
final DataStream source = env.socketTextStream("localhost", 
)
.assignTimestampsAndWatermarks(
WatermarkStrategy.forMonotonousTimestamps()
.withTimestampAssigner((String s, long ts) -> 
System.currentTimeMillis())
)
.keyBy(s -> s);
source.print("source ");
final Pattern pattern = Pattern.begin("begin", 
AfterMatchSkipStrategy.skipPastLastEvent())
.where(new SimpleCondition() {
@Override
public boolean filter(String s) throws Exception {
return true;
}
}).times(3);
final PatternStream patternStream = CEP.pattern(source, 
pattern);
patternStream.select(new PatternSelectFunction() {
@Override
public Object select(Map> pattern) {
return pattern.get("begin");
}
}).print("result ");
env.execute();
}
}

环境如下:
Flink 1.12.2
OS:Windows 10
编程工具:IDEA 2021.1.2
使用的是Flink默认的事件时间,水位线用的是单调递增的,使用的是系统时间

运行结果如下所示:
[cid:image001.png@01D75E43.4DB77F10]



请教下flink的提交方式

2022-07-04 Thread sherlock zw
目前我需要去监控已经提交的flink任务, 
但是通过命令行方式提交的话拿不到任务id,只能通过INFO级别的日志过滤出来,但是我们的环境里面的日志界别是WARN,看不到任务id的日志输出,所以想问下除了命令行的方式提交任务还有其他方式吗,例如有和Spark类似的SparkLaunch一样的jar提交的方式吗?希望大佬指点下,谢谢。




Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

2020-11-19 Thread sherlock zw
大佬们:
   在 Flink 1.10.x 中的 keyBy 算子可以同时按多个字段分组,比如 map.keyBy(0,1),但在 1.11.x 
版本中这种方式被弃用了,看了下源码好像不支持按多字段分组了?还是有别的其他形式?
   如果我想按多个字段分组的话需要怎么操作?
   请大佬指点!


答复: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

2020-11-19 Thread sherlock zw
我看了现在的 flink 1.11 的 keyBy 的代码,是使用的KeySelector 
key,但每次只能返回一个字段,不支持返回多个字段,也就说明了一次只能按一个字段去分组(PS: test.keyBy(t -> 
t.f0)),如果我想按多个字段进行分组的话该怎么操作呢?

-邮件原件-
发件人: guanxianchun  
发送时间: 2020年11月19日 20:53
收件人: user-zh@flink.apache.org
主题: Re: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

flink-1.11使用KeySelector



--
Sent from: http://apache-flink.147419.n8.nabble.com/


答复: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

2020-11-22 Thread sherlock zw
多谢指点,试了下,返回 Tuple 类型作为 key 是可以按多个字段进行分组的,拼接成 String 的话应该也是可以的
final SingleOutputStreamOperator> sum = flatMap
.keyBy(new KeySelector, 
Tuple2>() {
@Override
public Tuple2 getKey(Tuple3 tuple3) throws Exception {
return Tuple2.of(tuple3.f0, tuple3.f1);
}
})
.sum(2);

-邮件原件-
发件人: 赵一旦  
发送时间: 2020年11月23日 14:35
收件人: user-zh@flink.apache.org
主题: Re: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

(1)返回字符串,自己拼接就可以。
(2)返回Tuple类型作为Key。
1.10到1.11相当于是去除了多key的辅助keyBy方法,本身内部就是组成tuple。原因不清楚。

sherlock zw  于2020年11月20日周五 上午11:18写道:

> 我看了现在的 flink 1.11 的 keyBy 的代码,是使用的KeySelector
> key,但每次只能返回一个字段,不支持返回多个字段,也就说明了一次只能按一个字段去分组(PS: test.keyBy(t ->
> t.f0)),如果我想按多个字段进行分组的话该怎么操作呢?
>
> -邮件原件-
> 发件人: guanxianchun 
> 发送时间: 2020年11月19日 20:53
> 收件人: user-zh@flink.apache.org
> 主题: Re: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题
>
> flink-1.11使用KeySelector
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>