Hi,Fabian I am using flink CEP library with event time, but there is no output( the java code performed as expected, but scala did not) .My code is here:
object EventTimeTest extends App { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment() env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val input: DataStream[Event] = env.fromElements(new Event(1, "aa", DateUtils.dt2timestamp("2018-05-14 10:29:15.000000")), new Event(1, "ab", DateUtils.dt2timestamp("2018-05-14 10:29:25.000000")), new Event(3, "ac", DateUtils.dt2timestamp("2018-05-14 10:29:35.000000")), new Event(4, "ad", DateUtils.dt2timestamp("2018-05-14 10:29:45.000000")), new Event(5, "ae", DateUtils.dt2timestamp("2018-05-14 10:29:55.000000"))) input.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(100)) { override def extractTimestamp(element: Event): Long = { val ts = DateUtils.dt2long(element.getTs) println(ts) ts } }).setParallelism(1) val partitionedInput: KeyedStream[Event, Long] = input.keyBy(event => event.getId) val pattern: Pattern[Event, Event] = Pattern.begin("start") .subtype(classOf[Event]) .where(_.getName.startsWith("a")).within(Time.seconds(30)) val patternStream = CEP.pattern(partitionedInput, pattern) val alerts = patternStream.select(patternSelectFun => { val startEvent: Event = patternSelectFun("start").head println(startEvent.getName) startEvent.getName }) alerts.print() env.execute("start") } The java code is : public class EventTest { public static void main(String[] s) { LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); DataStream<Event> input = env.fromElements( new Event(1, "aa", "2018-05-14 10:29:15"), new Event(1, "ab", "2018-05-14 10:29:25"), new Event(3, "ac", "2018-05-14 10:29:35"), new Event(4, "ad", "2018-05-14 10:29:45"), new Event(5, "ae", "2018-05-14 10:29:55")); DataStream<Event> withTimestampsAndWatermarks = input.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) { @Override public long extractTimestamp(Event element) { try { Date dt = formatter.parse(element.umsTs); return dt.getTime(); } catch (ParseException e) { e.printStackTrace(); return 0; } } }); KeyedStream<Event, Long> partitionedInput = withTimestampsAndWatermarks.keyBy(new KeySelector<Event, Long>() { public Long getKey(Event e) { return e.id<http://e.id>; } }); Pattern<Event, Event> pattern = Pattern.<Event>begin("start") .subtype(Event.class) .where(new SimpleCondition<Event>() { @Override public boolean filter(Event event) { return event.name.startsWith("a"); } }).within(Time.seconds(30)); PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern); DataStream<List<Event>> alerts = patternStream.select( new PatternSelectFunction<Event, List<Event>>() { @Override public List<Event> select(Map<String, List<Event>> pattern) { List<Event> startEvent = pattern.get("start"); System.out.println("name:"+startEvent.get(0).name); return startEvent; } } ); alerts.print(); try { env.execute("start"); } catch (Exception e) { e.printStackTrace(); } } } Thanks! sensun