Paolo Rendano created FLINK-7549:
------------------------------------
Summary: CEP - Pattern not discovered if source streaming is very
fast
Key: FLINK-7549
URL: https://issues.apache.org/jira/browse/FLINK-7549
Project: Flink
Issue Type: Bug
Components: CEP
Affects Versions: 1.3.2, 1.3.1
Reporter: Paolo Rendano
Hi all,
I'm doing some stress test on my pattern using JMeter to populate source data
on a rabbitmq queue. This queue contains status generated by different devices
. In my test case I set to loop on a base of 1000 cycles, each one sending
respectively the first and the second status that generate the event using
flink CEP (status keyed by device).
In my early tests I launched that but I noticed that I get only partial results
in output (70/80% of the expected ones). Introducing a delay in jmeter plan
between the sending of the two status solved the problem. The minimum delay (of
course this is on my local machine, on other machines may vary) that make
things work is 20/25 ms.
My code is structured this way (the following is a semplification):
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(100L);
// source definition
DataStream<MyMessageWrapper> dataStreamSource =
env.addSource(new
MYRMQAutoboundQueueSource<>(connectionConfig,
conf.getSourceExchange(),
conf.getSourceRoutingKey(),
conf.getSourceQueueName(),
true,
new MyMessageWrapperSchema()))
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<MyMessageWrapper>(Time.minutes(1)) {
private static final long serialVersionUID =
-1L;
@Override
public long extractTimestamp(MyMessageWrapper
element) {
if
(element.getData().get("stateTimestamp")==null) {
throw new RuntimeException("Status
Timestamp is null during time ordering for device [" +
element.getData().get("deviceCode") + "]");
}
return
FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
}
})
.name("MyIncomingStatus");
// PATTERN DEFINITION
Pattern<MyMessageWrapper, ?> myPattern = Pattern
.<MyMessageWrapper>begin("start")
.subtype(MyMessageWrapper.class)
.where(whereEquals("st", "none"))
.next("end")
.subtype(MyMessageWrapper.class)
.where(whereEquals("st","started"))
.within(Time.minutes(3));
// CEP DEFINITION
PatternStream< MyMessageWrapper > myPatternStream =
CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);
DataStream<Either<TimeoutEvent, MyMessageWrapper >> outputStream =
myPatternStream.flatSelect(patternFlatTimeoutFunction,
patternFlatSelectFunction);
// SINK DEFINITION
outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, outputExchange,
new MyMessageWrapperSchema())).name("MyGeneratedEvent");
digging and logging messages received by flink in "extractTimestamp", what
happens is that with that so high rate of messages, source may receive messages
with the same timestamp but with different deviceCode.
Any idea?
Thanks, regards
Paolo
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)