[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16119868#comment-16119868 ]
ASF GitHub Bot commented on FLINK-7169: --------------------------------------- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132175418 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java --- @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.cep.nfa.AfterMatchSkipStrategy; +import org.apache.flink.cep.pattern.MalformedPatternException; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.List; +import java.util.Map; + +/** + * After match skip tests. + */ +public class AfterMatchSkipITCase extends StreamingMultipleProgramsTestBase { + + private String resultPath; + private String expected; + + private String lateEventPath; + private String expectedLateEvents; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + expected = ""; + + lateEventPath = tempFolder.newFile().toURI().toString(); + expectedLateEvents = ""; + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + compareResultsByLinesInMemory(expectedLateEvents, lateEventPath); + } + + private PatternSelectFunction<Event, String> newIdSelectFunction(String ... names) { + return new PatternSelectFunction<Event, String>() { + + @Override + public String select(Map<String, List<Event>> pattern) { + StringBuilder builder = new StringBuilder(); + for (String name: names) { + for (Event e : pattern.get(name)) { + builder.append(e.getId()).append(","); + } + } + return builder.toString(); + } + }; + } + + @Test + public void testSkipToNext() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Event> input = env.fromElements( + new Event(1, "a", 0.0), + new Event(2, "a", 0.0), + new Event(3, "a", 0.0), + new Event(4, "a", 0.0), + new Event(5, "a", 0.0), + new Event(6, "a", 0.0) + ); + Pattern<Event, ?> pattern = Pattern.<Event>begin("start", + new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT)) + .where(new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(3); + DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start")); + + result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + // expected sequence of matching event ids + expected = "1,2,3,\n2,3,4,\n3,4,5,\n4,5,6,"; + + env.execute(); + } + + @Test + public void testSkipPastLast() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Event> input = env.fromElements( + new Event(1, "a", 0.0), + new Event(2, "a", 0.0), + new Event(3, "a", 0.0), + new Event(4, "a", 0.0), + new Event(5, "a", 0.0), + new Event(6, "a", 0.0) + ); + Pattern<Event, ?> pattern = Pattern.<Event>begin("start", + new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_PAST_LAST_EVENT)).where(new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(3); + + DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start")); + + result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + // expected sequence of matching event ids + expected = "1,2,3,\n4,5,6,"; + + env.execute(); + } + + @Test + public void testSkipToFirst() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Event> input = env.fromElements( + new Event(1, "ab", 0.0), + new Event(2, "ab", 0.0), + new Event(3, "ab", 0.0), + new Event(4, "ab", 0.0), + new Event(5, "ab", 0.0), + new Event(6, "ab", 0.0), + new Event(7, "ab", 0.0) + ); + Pattern<Event, ?> pattern = Pattern.<Event>begin("start", + new AfterMatchSkipStrategy( + AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST, "end")) + .where(new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().contains("a"); + } + }).times(2).next("end").where(new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().contains("b"); + } + }).times(2); + + DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end")); + + result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + // expected sequence of matching event ids + expected = "1,2,3,4,\n3,4,5,6,"; + + env.execute(); + } + + @Test + public void testSkipToLast() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Event> input = env.fromElements( + new Event(1, "ab", 0.0), + new Event(2, "ab", 0.0), + new Event(3, "ab", 0.0), + new Event(4, "ab", 0.0), + new Event(5, "ab", 0.0), + new Event(6, "ab", 0.0), + new Event(7, "ab", 0.0) + ); + Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new AfterMatchSkipStrategy( + AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST, "end")).where(new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().contains("a"); + } + }).times(2).next("end").where(new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().contains("b"); + } + }).times(2); + DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end")); + + result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + // expected sequence of matching event ids + expected = "1,2,3,4,\n4,5,6,7,"; + + env.execute(); + } + + @Test(expected = MalformedPatternException.class) + public void testSkipToLastWithEmptyException() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Event> input = env.fromElements( + new Event(1, "ab", 0.0), + new Event(2, "c", 0.0), + new Event(3, "ab", 0.0), + new Event(4, "c", 0.0) + ); + Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new AfterMatchSkipStrategy( + AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST, "middle")).where(new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().contains("a"); + } + }).next("middle").where( + new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().contains("d"); + } + } + ).oneOrMore().optional().next("end").where(new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().contains("c"); + } + }); + DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end")); + + result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + } + + @Test(expected = MalformedPatternException.class) + public void testSkipToLastWithInfiniteLoopException() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Event> input = env.fromElements( + new Event(1, "ab", 0.0), + new Event(2, "c", 0.0), + new Event(3, "ab", 0.0), + new Event(4, "c", 0.0) + ); + Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new AfterMatchSkipStrategy( + AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST, "start")).where(new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().contains("a"); + } + }).next("middle").where( + new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().contains("d"); + } + } + ).oneOrMore().optional().next("end").where(new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().contains("c"); + } + }); + DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end")); + + result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + } + + @Test(expected = MalformedPatternException.class) + public void testSkipToFirstWithInfiniteLoopException() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Event> input = env.fromElements( + new Event(1, "ab", 0.0), + new Event(2, "c", 0.0), + new Event(3, "ab", 0.0), + new Event(4, "c", 0.0) + ); + Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new AfterMatchSkipStrategy( + AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST, "middle")).where(new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().contains("x"); + } + }).oneOrMore().optional().next("middle").where( + new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().contains("b"); + } + } + ).next("end").where(new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().contains("c"); + } + }); + + DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("middle", "end")); + + result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + env.execute(); + } + + @Test + public void testSkipPastLast2() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Event> input = env.fromElements( + new Event(1, "a1", 0.0), + new Event(2, "a2", 0.0), + new Event(3, "b1", 0.0), + new Event(4, "b2", 0.0), + new Event(5, "c1", 0.0), + new Event(6, "c2", 0.0), + new Event(7, "d1", 0.0), + new Event(8, "d2", 0.0) + ); + Pattern<Event, ?> pattern = Pattern.<Event>begin("a", new AfterMatchSkipStrategy( + AfterMatchSkipStrategy.SkipStrategy.SKIP_PAST_LAST_EVENT)).where(new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().contains("a"); + } + }).followedByAny("b").where( + new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().contains("b"); + } + } + ).followedByAny("c").where(new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().contains("c"); + } + }) + .followedByAny("d").where(new SimpleCondition<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().contains("d"); + } + }); + + DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("a", "b", "c", "d")); + + result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + + // expected sequence of matching event ids + expected = "1,3,5,7,\n1,3,6,7,\n1,4,5,7,\n1,4,6,7,\n2,3,5,7,\n2,3,6,7,\n2,4,5,7,\n2,4,6,7,"; + + env.execute(); + } + + @Test + public void testSkipPastLast3() throws Exception { --- End diff -- same as above > Support AFTER MATCH SKIP function in CEP library API > ---------------------------------------------------- > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP > Reporter: Yueting Chen > Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)