[
https://issues.apache.org/jira/browse/FLINK-5753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16010121#comment-16010121
]
Michał Jurkiewicz commented on FLINK-5753:
------------------------------------------
Hi [~kkl0u],
Here is what I set:
{code}
myInputStream.assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator());
{code}
{code}
/**
* This generator generates watermarks that are lagging behind
processing time by a certain amount.
* It assumes that elements arrive in Flink after at most a certain
time.
*/
private static class TimeLagWatermarkGenerator implements
AssignerWithPeriodicWatermarks<Event> {
/** The Constant serialVersionUID. */
private static final long serialVersionUID = 1L;
/** The Constant OUT_OF_ORDERNESS_THRESHOLD. */
private static final int OUT_OF_ORDERNESS_THRESHOLD_MILLIS =
5000;
/* (non-Javadoc)
* @see
org.apache.flink.streaming.api.functions.TimestampAssigner#extractTimestamp(java.lang.Object,
long)
*/
@Override
public long extractTimestamp(Event event, long
previousElementTimestamp) {
return event.getEventTimestamp().getTime();
}
/* (non-Javadoc)
* @see
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks#getCurrentWatermark()
*/
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current time minus the
maximum time lag
return new Watermark(System.currentTimeMillis() -
OUT_OF_ORDERNESS_THRESHOLD_MILLIS);
}
}
{code}
Should I also add {code}env.getConfig().setAutoWatermarkInterval(100L);{code}
this configuration ?
> CEP timeout handler.
> --------------------
>
> Key: FLINK-5753
> URL: https://issues.apache.org/jira/browse/FLINK-5753
> Project: Flink
> Issue Type: Bug
> Components: CEP
> Affects Versions: 1.1.2
> Reporter: Michał Jurkiewicz
> Assignee: Kostas Kloudas
>
> I configured the following flink job in my environment:
> {code}
> Pattern<Event, ?> patternCommandStarted = Pattern.<Event>
> begin("event-accepted").subtype(Event.class)
> .where(e -> {event accepted where
> statement}).next("second-event-started").subtype(Event.class)
> .where(e -> {event started where statement}))
> .within(Time.seconds(30));
> DataStream<Either<Event, Event>> events = CEP
> .pattern(eventsStream.keyBy(e -> e.getEventProperties().get("deviceCode")),
> patternCommandStarted)
> .select(eventSelector, eventSelector);
> static class EventSelector implements PatternSelectFunction<Event, Event>,
> PatternTimeoutFunction<Event, Event> {}
> {code}
> The problem that I have is related to timeout handling. I observed that:
> if: first event appears, second event not appear in the stream
> and *no new events appear in a stream*, timeout handler is not executed.
> Expected result: timeout handler should be executed in case if there are no
> new events in a stream
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)