Paolo Rendano created FLINK-7549:
------------------------------------

             Summary: CEP - Pattern not discovered if source streaming is very 
fast
                 Key: FLINK-7549
                 URL: https://issues.apache.org/jira/browse/FLINK-7549
             Project: Flink
          Issue Type: Bug
          Components: CEP
    Affects Versions: 1.3.2, 1.3.1
            Reporter: Paolo Rendano


Hi all,
I'm doing some stress test on my pattern using JMeter to populate source data 
on a rabbitmq queue. This queue contains status generated by different devices 
. In my test case I set to loop on a base of 1000 cycles, each one sending 
respectively the first and the second status that generate the event using 
flink CEP (status keyed by device). 
In my early tests I launched that but I noticed that I get only partial results 
in output (70/80% of the expected ones). Introducing a delay in jmeter plan 
between the sending of the two status solved the problem. The minimum delay (of 
course this is on my local machine, on other machines may vary) that make 
things work is 20/25 ms.

My code is structured this way (the following is a semplification):

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(100L);

// source definition
DataStream<MyMessageWrapper> dataStreamSource =
                    env.addSource(new 
MYRMQAutoboundQueueSource<>(connectionConfig,
                            conf.getSourceExchange(),
                            conf.getSourceRoutingKey(),
                            conf.getSourceQueueName(),
                            true,
                            new MyMessageWrapperSchema()))
                            .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<MyMessageWrapper>(Time.minutes(1)) {
                                private static final long serialVersionUID = 
-1L;
                                @Override
                                public long extractTimestamp(MyMessageWrapper 
element) {
                                    if 
(element.getData().get("stateTimestamp")==null) {
                                        throw new RuntimeException("Status 
Timestamp is null during time ordering for device [" +  
element.getData().get("deviceCode") + "]");
                                    }
                                    return 
FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
                                }
                            })
                            .name("MyIncomingStatus");

// PATTERN  DEFINITION
Pattern<MyMessageWrapper, ?> myPattern = Pattern
                .<MyMessageWrapper>begin("start")
                        .subtype(MyMessageWrapper.class)
                        .where(whereEquals("st", "none"))
                .next("end")
                        .subtype(MyMessageWrapper.class)
                        .where(whereEquals("st","started"))
                .within(Time.minutes(3));

// CEP DEFINITION
PatternStream< MyMessageWrapper > myPatternStream = 
CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);

DataStream<Either<TimeoutEvent, MyMessageWrapper >> outputStream = 
myPatternStream.flatSelect(patternFlatTimeoutFunction, 
patternFlatSelectFunction);

// SINK DEFINITION
outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, outputExchange, 
new MyMessageWrapperSchema())).name("MyGeneratedEvent");

digging and logging messages received by flink in "extractTimestamp", what 
happens is that with that so high rate of messages, source may receive messages 
with the same timestamp but with different deviceCode. 
Any idea?

Thanks, regards
Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to