Hi Till, thank you. here's the code: public class CepStorzSimulator { public static void main(String[] args) throws Exception { final ParameterTool parameterTool = ParameterTool.fromArgs(args); if(parameterTool.getNumberOfParameters() < 3) { System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> --bootstrap.servers <kafka brokers> --group.id <some id>"); System.exit(1); } CepStorzSimulator reader = new CepStorzSimulator(); reader.run(parameterTool); } public void run(ParameterTool parameterTool) throws Exception { String topic = "test-simulator";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 5000)); //env.enableCheckpointing(15000); // create a checkpoint every 5 seconds env.setParallelism(4); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); DataStream<String> kafkaStream = env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(), parameterTool.getProperties())); DataStream<Tuple5<String, String, String, String, Double>> data = kafkaStream.flatMap(new SplitMapper()); SingleOutputStreamOperator<Tuple6<String, String, String, Double, Double, Double>> windowedData = data.filter(new FilterFunction<Tuple5<String, String, String, String, Double>>() { private static final long serialVersionUID = -5952425756492833594L; @Override public boolean filter(Tuple5<String, String, String, String, Double> val) throws Exception { return val.f3.contains("target - Value"); } }) .keyBy(3) .timeWindow(Time.seconds(10), Time.seconds(1)) .fold(new Tuple6<>("", "", "", 0.0d, 0.0d, 0.0d), new pressureElementCount()); windowedData.print(); Pattern<Tuple6<String, String, String, Double, Double, Double>, ?> FlowFirstPattern = Pattern.<Tuple6<String, String, String, Double, Double, Double>>begin("FlowOver10") .where(new FilterFunction<Tuple6<String, String, String, Double, Double, Double>>() { private static final long serialVersionUID = 5861517245439863889L; @Override public boolean filter(Tuple6<String, String, String, Double, Double, Double> value) throws Exception { double avgFlow= (value.f5/value.f4); return value.f2.contains("Flow target - Value") && avgFlow > 25.0;// && (value.f2 > avgFlow*1.0); } }) .followedBy("PressureOver10").where(new FilterFunction<Tuple6<String, String, String, Double, Double, Double>>() { private static final long serialVersionUID = -4037517308930307522L; @Override public boolean filter(Tuple6<String, String, String, Double, Double, Double> value) throws Exception { double avgPressure = (value.f5/value.f4); //System.out.println("Pressure: " + avgPressure); return value.f2.equals("Pressure target - Value") && (avgPressure > 5.0);// && (value.f2 > avgPressure*1.0); } }) .within(Time.seconds(10)); PatternStream<Tuple6<String, String, String, Double, Double, Double>> FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern); DataStream<String> warning = FlowFirstPatternStream.select(new PlacingWorkingTrocarWarning()); warning.print(); env.execute(); } private static class PlacingWorkingTrocarWarning implements PatternSelectFunction<Tuple6<String, String, String, Double, Double, Double>, String> { private static final long serialVersionUID = 2576609635170800026L; @Override public String select(Map<String, Tuple6<String, String, String, Double, Double, Double>> pat) throws Exception { //Tuple5<String, String, Double, Double, Double> pressure = pat.get("PressureOver10"); //Tuple5<String, String, Double, Double, Double> flow = pat.get("FlowOver10"); return " ####### Warning! FlowEvent ####### "; } } private static class pressureElementCount implements FoldFunction<Tuple5<String, String, String, String, Double>, Tuple6<String, String, String, Double, Double, Double>>{ private static final long serialVersionUID = -1081752808506520154L; @Override public Tuple6<String, String, String, Double, Double, Double> fold(Tuple6<String, String, String, Double, Double, Double> init, Tuple5<String, String, String, String, Double> val) throws Exception { double count = init.f4+1.0d; double sum = init.f5+val.f4; //!!! return new Tuple6<>(val.f0, val.f1, val.f3, val.f4, count, sum); } } private static class SplitMapper extends RichFlatMapFunction<String, Tuple5<String, String, String, String, Double>> { private static final long serialVersionUID = 7297664214330222193L; @Override public void flatMap(String msg, Collector<Tuple5<String, String, String, String, Double>> out) throws Exception { EncodeValues enc = new EncodeValues(); getRuntimeContext().getLongCounter("eventCount").add(1L); String[] split_msg = msg.split("\t"); String DeviceId = split_msg[1]; String [] array = split_msg[3].split(", \""); for(String a:array){ String[] split = a.split(":"); String val = split[1]; String testname = "test1"; String nom = val.replace("\"", "").replace("{", "").replace("}", "").replace(",", "."); String param = split[0].replace("\"", "").replace("{", "").replace("}", ""); double codedVal = enc.encode(nom); out.collect(new Tuple5<String, String, String, String, Double>(UUID.randomUUID().toString(), testname, DeviceId, param, codedVal)); } } } } Example data looks like this: 1> 00:36:06.459 1 2121 {"Pressure target - Value":"6", "Pressure target - Unit":"mmHg"} 1> 00:36:06.463 1 2121 {"Flow target - Value":"23", "Flow target - Unit":"l/min"} -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948p5986.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.