Hello,
 
I’ve already asked the question today and got the solve:  
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html
 , and it’s clean for me how PatternStream works with ProcessTime.
 
But I need help again, I can’t write proper code to execute PatternStream with 
EventTime regime.
I think the problem is how I assign the watermark strategy.
 
My code is below, version of Flink is 1.12:
 
public class Main {
 
    public static void main(String[] args) throws Exception {
 
        Properties properties = new Properties();
        properties.put("group.id", "Flink");
        properties.put("bootstrap.servers", "broker:9092");
 
 
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                "test",
                new SimpleStringSchema(),
                properties);
 
        DataStream<String> stream = env
                .addSource(consumer)
                .map((MapFunction<String, String>) s -> {
                    //  Just getting an object model
                    return model.toString();
                }). 
assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                        .withTimestampAssigner((event, timestamp) -> {
                            Model model = new Gson().fromJson(event, 
Model.class);
                            return model.getServerTime();
                         }));
 
        stream.print("Stream");
 
 
 
        Pattern<String, String> firstPattern = Pattern
                .<String>begin("first")
                .where(new IterativeCondition<String>() {
                    @Override
                    public boolean filter(String s, Context<String> context) 
throws Exception {
                        return s.contains("Start");
                    }
                });
 
        DataStream<String> result = CEP
                .pattern(stream, firstPattern)
                 .inEventTime() //  default TimeCharacteristic for 1.12
                .process(new PatternProcessFunction<String, String>() {
                    @Override
                    public void processMatch(Map<String, List<String>> map, 
Context context, Collector<String> collector) throws Exception {
                        collector.collect(map.get("first").get(0));
                    }
                });
 
        result.print("Result");
 
        env.execute();
    }
 
}
 
Please, help me to correct the code )
 
Thanks, Yuri L.
 Ответить
 Переслать
 Предложить звонок
 Создать событие
Принято Хорошо Все понятно, спасибо за информацию
 
 
 

Reply via email to