Hi,Fabian

      I am using flink CEP library with event time, but there is no output( the 
java code performed as expected, but scala did not) .My code is here:


object EventTimeTest extends App {
  val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.createLocalEnvironment()
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val input: DataStream[Event] = env.fromElements(new Event(1, "aa", 
DateUtils.dt2timestamp("2018-05-14 10:29:15.000000")),
    new Event(1, "ab", DateUtils.dt2timestamp("2018-05-14 10:29:25.000000")),
    new Event(3, "ac", DateUtils.dt2timestamp("2018-05-14 10:29:35.000000")),
    new Event(4, "ad", DateUtils.dt2timestamp("2018-05-14 10:29:45.000000")),
    new Event(5, "ae", DateUtils.dt2timestamp("2018-05-14 10:29:55.000000")))


  input.assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(100)) {
      override def extractTimestamp(element: Event): Long = {
        val ts = DateUtils.dt2long(element.getTs)
        println(ts)
        ts
      }
    }).setParallelism(1)

  val partitionedInput: KeyedStream[Event, Long] = input.keyBy(event => 
event.getId)

  val pattern: Pattern[Event, Event] = Pattern.begin("start")
    .subtype(classOf[Event])
    .where(_.getName.startsWith("a")).within(Time.seconds(30))

  val patternStream = CEP.pattern(partitionedInput, pattern)

  val alerts = patternStream.select(patternSelectFun => {
    val startEvent: Event = patternSelectFun("start").head
    println(startEvent.getName)
    startEvent.getName
  })


  alerts.print()

  env.execute("start")

}



The java code is :



public class EventTest {

    public static void main(String[] s) {

        LocalStreamEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss");

        DataStream<Event> input = env.fromElements(
                new Event(1, "aa", "2018-05-14 10:29:15"),
                new Event(1, "ab", "2018-05-14 10:29:25"),
                new Event(3, "ac", "2018-05-14 10:29:35"),
                new Event(4, "ad", "2018-05-14 10:29:45"),
                new Event(5, "ae", "2018-05-14 10:29:55"));


        DataStream<Event> withTimestampsAndWatermarks =
                input.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
                    @Override
                    public long extractTimestamp(Event element) {
                        try {
                            Date dt = formatter.parse(element.umsTs);
                            return dt.getTime();
                        } catch (ParseException e) {
                            e.printStackTrace();
                            return 0;
                        }
                    }
                });

        KeyedStream<Event, Long> partitionedInput = 
withTimestampsAndWatermarks.keyBy(new KeySelector<Event, Long>() {
            public Long getKey(Event e) {
                return e.id<http://e.id>;
            }
        });

        Pattern<Event, Event> pattern = Pattern.<Event>begin("start")
                .subtype(Event.class)
                .where(new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) {
                        return event.name.startsWith("a");
                    }
                }).within(Time.seconds(30));

        PatternStream<Event> patternStream = CEP.pattern(partitionedInput, 
pattern);

        DataStream<List<Event>> alerts = patternStream.select(
                new PatternSelectFunction<Event, List<Event>>() {
                    @Override
                    public List<Event> select(Map<String, List<Event>> pattern) 
{
                        List<Event> startEvent = pattern.get("start");
                        System.out.println("name:"+startEvent.get(0).name);
                        return startEvent;
                    }
                }
        );
        alerts.print();

        try {
            env.execute("start");
        } catch (Exception e) {
            e.printStackTrace();
        }


    }


}




Thanks!
sensun

Reply via email to