[ 
https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paolo Rendano updated FLINK-7549:
---------------------------------
    Description: 
Hi all,
I'm doing some stress test on my pattern using JMeter to populate source data 
on a rabbitmq queue. This queue contains status generated by different devices 
. In my test case I set to loop on a base of 1000 cycles, each one sending 
respectively the first and the second status that generate the event using 
flink CEP (status keyed by device). 
In my early tests I launched that but I noticed that I get only partial results 
in output (70/80% of the expected ones). Introducing a delay in jmeter plan 
between the sending of the two status solved the problem. The minimum delay (of 
course this is on my local machine, on other machines may vary) that make 
things work is 20/25 ms.

My code is structured this way (the following is a semplification):


{code:java}
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(100L);

// source definition
DataStream<MyMessageWrapper> dataStreamSource =
                    env.addSource(new 
MYRMQAutoboundQueueSource<>(connectionConfig,
                            conf.getSourceExchange(),
                            conf.getSourceRoutingKey(),
                            conf.getSourceQueueName(),
                            true,
                            new MyMessageWrapperSchema()))
                            .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<MyMessageWrapper>(Time.minutes(1)) {
                                private static final long serialVersionUID = 
-1L;
                                @Override
                                public long extractTimestamp(MyMessageWrapper 
element) {
                                    if 
(element.getData().get("stateTimestamp")==null) {
                                        throw new RuntimeException("Status 
Timestamp is null during time ordering for device [" +  
element.getData().get("deviceCode") + "]");
                                    }
                                    return 
FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
                                }
                            })
                            .name("MyIncomingStatus");

// PATTERN  DEFINITION
Pattern<MyMessageWrapper, ?> myPattern = Pattern
                .<MyMessageWrapper>begin("start")
                        .subtype(MyMessageWrapper.class)
                        .where(whereEquals("st", "none"))
                .next("end")
                        .subtype(MyMessageWrapper.class)
                        .where(whereEquals("st","started"))
                .within(Time.minutes(3));

// CEP DEFINITION
PatternStream< MyMessageWrapper > myPatternStream = 
CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);

DataStream<Either<TimeoutEvent, MyMessageWrapper >> outputStream = 
myPatternStream.flatSelect(patternFlatTimeoutFunction, 
patternFlatSelectFunction);

// SINK DEFINITION
outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, outputExchange, 
new MyMessageWrapperSchema())).name("MyGeneratedEvent");
{code}

digging and logging messages received by flink in "extractTimestamp", what 
happens is that with that so high rate of messages, source may receive messages 
with the same timestamp but with different deviceCode. 
Any idea?

Thanks, regards
Paolo

  was:
Hi all,
I'm doing some stress test on my pattern using JMeter to populate source data 
on a rabbitmq queue. This queue contains status generated by different devices 
. In my test case I set to loop on a base of 1000 cycles, each one sending 
respectively the first and the second status that generate the event using 
flink CEP (status keyed by device). 
In my early tests I launched that but I noticed that I get only partial results 
in output (70/80% of the expected ones). Introducing a delay in jmeter plan 
between the sending of the two status solved the problem. The minimum delay (of 
course this is on my local machine, on other machines may vary) that make 
things work is 20/25 ms.

My code is structured this way (the following is a semplification):

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(100L);

// source definition
DataStream<MyMessageWrapper> dataStreamSource =
                    env.addSource(new 
MYRMQAutoboundQueueSource<>(connectionConfig,
                            conf.getSourceExchange(),
                            conf.getSourceRoutingKey(),
                            conf.getSourceQueueName(),
                            true,
                            new MyMessageWrapperSchema()))
                            .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<MyMessageWrapper>(Time.minutes(1)) {
                                private static final long serialVersionUID = 
-1L;
                                @Override
                                public long extractTimestamp(MyMessageWrapper 
element) {
                                    if 
(element.getData().get("stateTimestamp")==null) {
                                        throw new RuntimeException("Status 
Timestamp is null during time ordering for device [" +  
element.getData().get("deviceCode") + "]");
                                    }
                                    return 
FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
                                }
                            })
                            .name("MyIncomingStatus");

// PATTERN  DEFINITION
Pattern<MyMessageWrapper, ?> myPattern = Pattern
                .<MyMessageWrapper>begin("start")
                        .subtype(MyMessageWrapper.class)
                        .where(whereEquals("st", "none"))
                .next("end")
                        .subtype(MyMessageWrapper.class)
                        .where(whereEquals("st","started"))
                .within(Time.minutes(3));

// CEP DEFINITION
PatternStream< MyMessageWrapper > myPatternStream = 
CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);

DataStream<Either<TimeoutEvent, MyMessageWrapper >> outputStream = 
myPatternStream.flatSelect(patternFlatTimeoutFunction, 
patternFlatSelectFunction);

// SINK DEFINITION
outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, outputExchange, 
new MyMessageWrapperSchema())).name("MyGeneratedEvent");

digging and logging messages received by flink in "extractTimestamp", what 
happens is that with that so high rate of messages, source may receive messages 
with the same timestamp but with different deviceCode. 
Any idea?

Thanks, regards
Paolo


> CEP - Pattern not discovered if source streaming is very fast
> -------------------------------------------------------------
>
>                 Key: FLINK-7549
>                 URL: https://issues.apache.org/jira/browse/FLINK-7549
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.3.1, 1.3.2
>            Reporter: Paolo Rendano
>
> Hi all,
> I'm doing some stress test on my pattern using JMeter to populate source data 
> on a rabbitmq queue. This queue contains status generated by different 
> devices . In my test case I set to loop on a base of 1000 cycles, each one 
> sending respectively the first and the second status that generate the event 
> using flink CEP (status keyed by device). 
> In my early tests I launched that but I noticed that I get only partial 
> results in output (70/80% of the expected ones). Introducing a delay in 
> jmeter plan between the sending of the two status solved the problem. The 
> minimum delay (of course this is on my local machine, on other machines may 
> vary) that make things work is 20/25 ms.
> My code is structured this way (the following is a semplification):
> {code:java}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setAutoWatermarkInterval(100L);
> // source definition
> DataStream<MyMessageWrapper> dataStreamSource =
>                     env.addSource(new 
> MYRMQAutoboundQueueSource<>(connectionConfig,
>                             conf.getSourceExchange(),
>                             conf.getSourceRoutingKey(),
>                             conf.getSourceQueueName(),
>                             true,
>                             new MyMessageWrapperSchema()))
>                             .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor<MyMessageWrapper>(Time.minutes(1)) {
>                                 private static final long serialVersionUID = 
> -1L;
>                                 @Override
>                                 public long extractTimestamp(MyMessageWrapper 
> element) {
>                                     if 
> (element.getData().get("stateTimestamp")==null) {
>                                         throw new RuntimeException("Status 
> Timestamp is null during time ordering for device [" +  
> element.getData().get("deviceCode") + "]");
>                                     }
>                                     return 
> FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
>                                 }
>                             })
>                             .name("MyIncomingStatus");
> // PATTERN  DEFINITION
> Pattern<MyMessageWrapper, ?> myPattern = Pattern
>                 .<MyMessageWrapper>begin("start")
>                       .subtype(MyMessageWrapper.class)
>                       .where(whereEquals("st", "none"))
>                 .next("end")
>                       .subtype(MyMessageWrapper.class)
>                       .where(whereEquals("st","started"))
>                 .within(Time.minutes(3));
> // CEP DEFINITION
> PatternStream< MyMessageWrapper > myPatternStream = 
> CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);
> DataStream<Either<TimeoutEvent, MyMessageWrapper >> outputStream = 
> myPatternStream.flatSelect(patternFlatTimeoutFunction, 
> patternFlatSelectFunction);
> // SINK DEFINITION
> outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, 
> outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent");
> {code}
> digging and logging messages received by flink in "extractTimestamp", what 
> happens is that with that so high rate of messages, source may receive 
> messages with the same timestamp but with different deviceCode. 
> Any idea?
> Thanks, regards
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to