Hi Till,
thank you. here's the code:

public class CepStorzSimulator {
        
        public static void main(String[] args) throws Exception {
                
                        final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
        
                        if(parameterTool.getNumberOfParameters() < 3) {
                                System.out.println("Missing parameters!\nUsage: 
Kafka --topic <topic>
--bootstrap.servers <kafka brokers> --group.id <some id>");
                                System.exit(1);
                        }
                                                
                        CepStorzSimulator reader = new CepStorzSimulator();
                reader.run(parameterTool);
        }
        
        public void run(ParameterTool parameterTool) throws Exception {
                
                String topic = "test-simulator";        

                StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
                //env.getConfig().disableSysoutLogging();
                
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
5000));
                //env.enableCheckpointing(15000);                               
                                        // create a checkpoint every 5
seconds
                env.setParallelism(4);
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 
                
        DataStream<String> kafkaStream = env.addSource(new
FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(),
parameterTool.getProperties()));

        DataStream<Tuple5&lt;String, String, String, String, Double>> data =
kafkaStream.flatMap(new SplitMapper());     
        
        SingleOutputStreamOperator<Tuple6&lt;String, String, String, Double,
Double, Double>> windowedData = 
                                 data.filter(new 
FilterFunction<Tuple5&lt;String, String, String,
String, Double>>() {

                                        private static final long 
serialVersionUID = -5952425756492833594L;

                                                @Override
                                                public boolean 
filter(Tuple5<String, String, String, String, Double>
val) throws Exception {

                                                        return 
val.f3.contains("target - Value");
                                                }
                                         })
                                         .keyBy(3)
                                         .timeWindow(Time.seconds(10), 
Time.seconds(1))
                                         .fold(new Tuple6<>("", "", "", 0.0d, 
0.0d, 0.0d), new
pressureElementCount());
                                         
                windowedData.print();
                
                Pattern<Tuple6&lt;String, String, String, Double, Double, 
Double>, ?>
FlowFirstPattern = 
                                Pattern.<Tuple6&lt;String, String, String, 
Double, Double,
Double>>begin("FlowOver10")
                            .where(new FilterFunction<Tuple6&lt;String, String, 
String, Double,
Double, Double>>() {

                                private static final long serialVersionUID = 
5861517245439863889L;

                                        @Override
                                        public boolean filter(Tuple6<String, 
String, String, Double, Double,
Double> value) throws Exception {

                                                double avgFlow= 
(value.f5/value.f4);
                                                
                                                return value.f2.contains("Flow 
target - Value") && avgFlow > 25.0;//
&& (value.f2 > avgFlow*1.0);
                                        }
                                })
                            .followedBy("PressureOver10").where(new
FilterFunction<Tuple6&lt;String, String, String, Double, Double, Double>>()
{

                                        private static final long 
serialVersionUID = -4037517308930307522L;

                                        @Override
                                        public boolean filter(Tuple6<String, 
String, String, Double, Double,
Double> value) throws Exception {
                                                
                                                double avgPressure = 
(value.f5/value.f4);
                                                //System.out.println("Pressure: 
" + avgPressure);
                                                
                                                return 
value.f2.equals("Pressure target - Value") && (avgPressure >
5.0);// && (value.f2 > avgPressure*1.0);
                                        }
                                })
                            .within(Time.seconds(10));  
                
                PatternStream<Tuple6&lt;String, String, String, Double, Double, 
Double>>
FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern);
                DataStream<String> warning = FlowFirstPatternStream.select(new
PlacingWorkingTrocarWarning());
                warning.print();                
                
                env.execute();
        }
        
        private static class PlacingWorkingTrocarWarning implements
PatternSelectFunction<Tuple6&lt;String, String, String, Double, Double,
Double>, String> {

                private static final long serialVersionUID = 
2576609635170800026L;

                @Override
                public String select(Map<String, Tuple6&lt;String, String, 
String, Double,
Double, Double>> pat) throws Exception {

                        //Tuple5<String, String, Double, Double, Double> 
pressure =
pat.get("PressureOver10");
                        //Tuple5<String, String, Double, Double, Double> flow =
pat.get("FlowOver10");
                
                        return "  #######   Warning! FlowEvent   ####### ";     
                
                }
        }
        
        private static class pressureElementCount implements
FoldFunction<Tuple5&lt;String, String, String, String, Double>,
Tuple6<String, String, String, Double, Double, Double>>{

                private static final long serialVersionUID = 
-1081752808506520154L;

                @Override
                public Tuple6<String, String, String, Double, Double, Double>
fold(Tuple6<String, String, String, Double, Double, Double> init,
Tuple5<String, String, String, String, Double> val) throws Exception {
                        
                        double count = init.f4+1.0d;
                        double sum = init.f5+val.f4; //!!!
                        return new Tuple6<>(val.f0, val.f1, val.f3, val.f4, 
count, sum);
                }               
        }
        
        private static class SplitMapper extends RichFlatMapFunction<String,
Tuple5&lt;String, String, String, String, Double>> {

                private static final long serialVersionUID = 
7297664214330222193L;
                
                @Override
                public void flatMap(String msg, Collector<Tuple5&lt;String, 
String,
String, String, Double>> out) throws Exception {
                        
                        EncodeValues enc = new EncodeValues();                  
                        
getRuntimeContext().getLongCounter("eventCount").add(1L);
                        String[] split_msg = msg.split("\t");
                        String DeviceId = split_msg[1];
                        String [] array = split_msg[3].split(", \"");
                                                                        
                        for(String a:array){
                                                                        
                                        String[] split = a.split(":");
                                        String val = split[1];
                                        String testname = "test1";

                                        String nom = val.replace("\"", 
"").replace("{", "").replace("}",
"").replace(",", ".");
                                        String param = split[0].replace("\"", 
"").replace("{", "").replace("}",
"");
                                                                                
                                        double codedVal = enc.encode(nom);
                                        
                                        out.collect(new Tuple5<String, String, 
String, String,
Double>(UUID.randomUUID().toString(), testname, DeviceId, param, codedVal));
                        }                       
                }
        }       
}


Example data looks like this:
1> 00:36:06.459 1       2121    {"Pressure target - Value":"6", "Pressure 
target -
Unit":"mmHg"}
1> 00:36:06.463 1       2121    {"Flow target - Value":"23", "Flow target -
Unit":"l/min"}





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948p5986.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to