[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074518#comment-16074518 ] ASF GitHub Bot commented on FLINK-6927: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4153 > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073076#comment-16073076 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4153 @dawidwys Thanks a lot for the review. Updated the doc. > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072489#comment-16072489 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4153 @dianfu Thanks for the update. The code looks really nice right now. The only thing that is missing for this PR, are the docs. Could you please add a section about the group patterns? > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071577#comment-16071577 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4153 @dawidwys thanks a lot for your comments. Have updated the PR and it should have addressed all the comments. :) > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071576#comment-16071576 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125178823 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java --- @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.GroupPattern; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link GroupPattern}. + */ +@SuppressWarnings("unchecked") +public class GroupITCase extends TestLogger { --- End diff -- updated. > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071546#comment-16071546 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125176853 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java --- @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.GroupPattern; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link GroupPattern}. + */ +@SuppressWarnings("unchecked") +public class GroupITCase extends TestLogger { + + @Test + public void testGroupFollowedBy() { + List> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event b1 = new Event(42, "b", 3.0); + Event a2 = new Event(43, "a", 4.0); + Event b2 = new Event(44, "b", 5.0); + Event d = new Event(45, "d", 6.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(b1, 3)); + inputEvents.add(new StreamRecord<>(a2, 4)); + inputEvents.add(new StreamRecord<>(b2, 5)); + inputEvents.add(new StreamRecord<>(d, 6)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy(Pattern.begin("middle1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedBy("middle2").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + })).times(2).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(c, a1, b1, a2, b2, d) +
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071545#comment-16071545 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125176801 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java --- @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.GroupPattern; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link GroupPattern}. + */ +@SuppressWarnings("unchecked") +public class GroupITCase extends TestLogger { + + @Test + public void testGroupFollowedBy() { + List> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event b1 = new Event(42, "b", 3.0); + Event a2 = new Event(43, "a", 4.0); + Event b2 = new Event(44, "b", 5.0); + Event d = new Event(45, "d", 6.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(b1, 3)); + inputEvents.add(new StreamRecord<>(a2, 4)); + inputEvents.add(new StreamRecord<>(b2, 5)); + inputEvents.add(new StreamRecord<>(d, 6)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy(Pattern.begin("middle1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedBy("middle2").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + })).times(2).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(c, a1, b1, a2, b2, d) +
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071540#comment-16071540 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125176457 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java --- @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.GroupPattern; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link GroupPattern}. + */ +@SuppressWarnings("unchecked") +public class GroupITCase extends TestLogger { + + @Test + public void testGroupFollowedBy() { + List> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event b1 = new Event(42, "b", 3.0); + Event a2 = new Event(43, "a", 4.0); + Event b2 = new Event(44, "b", 5.0); + Event d = new Event(45, "d", 6.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(b1, 3)); + inputEvents.add(new StreamRecord<>(a2, 4)); + inputEvents.add(new StreamRecord<>(b2, 5)); + inputEvents.add(new StreamRecord<>(d, 6)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy(Pattern.begin("middle1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedBy("middle2").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + })).times(2).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(c, a1, b1, a2, b2, d) +
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071538#comment-16071538 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125176384 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java --- @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.GroupPattern; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link GroupPattern}. + */ +@SuppressWarnings("unchecked") +public class GroupITCase extends TestLogger { + + @Test + public void testGroupFollowedBy() { --- End diff -- done > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071537#comment-16071537 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125176383 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java --- @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.GroupPattern; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link GroupPattern}. + */ +@SuppressWarnings("unchecked") +public class GroupITCase extends TestLogger { + + @Test + public void testGroupFollowedBy() { + List> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event b1 = new Event(42, "b", 3.0); + Event a2 = new Event(43, "a", 4.0); + Event b2 = new Event(44, "b", 5.0); + Event d = new Event(45, "d", 6.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(b1, 3)); + inputEvents.add(new StreamRecord<>(a2, 4)); + inputEvents.add(new StreamRecord<>(b2, 5)); + inputEvents.add(new StreamRecord<>(d, 6)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy(Pattern.begin("middle1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedBy("middle2").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + })).times(2).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(c, a1, b1, a2, b2, d) +
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071525#comment-16071525 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125173912 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java --- @@ -430,6 +431,54 @@ public Quantifier getQuantifier() { return this; } + /** +* Starts a new pattern sequence. The provided pattern is the initial pattern +* of the new sequence. +* +* @param group the pattern to begin with +* @return the first pattern of a pattern sequence +*/ + public static GroupPattern begin(Pattern group) { + return new GroupPattern<>(null, group); + } + + /** +* Appends a new pattern to the existing one. The new pattern enforces non-strict --- End diff -- done > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071523#comment-16071523 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125173876 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java --- @@ -153,9 +153,8 @@ public int hashCode() { private final int to; private Times(int from, int to) { - Preconditions.checkArgument(from >= 0, "The from should be a non-negative number greater than or equal to 0."); + Preconditions.checkArgument(from > 0, "The from should be a positive number greater than 0."); --- End diff -- done. > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071524#comment-16071524 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125173877 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java --- @@ -92,6 +92,57 @@ public boolean filter(Event value) throws Exception { } @Test + public void testTimesRangeFromZero() { --- End diff -- done > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071522#comment-16071522 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125173871 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java --- @@ -55,6 +55,11 @@ public void checkNameUniqueness(String name) { if (usedNames.contains(name)) { throw new MalformedPatternException("Duplicate pattern name: " + name + ". Names must be unique."); } + usedNames.add(name); + } + --- End diff -- updated > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071520#comment-16071520 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125173828 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -455,6 +548,76 @@ private void addStopStateToLooping(final State loopingState) { } /** +* Create all the states for the group pattern. +* +* @param groupPattern the group pattern to create the states for +* @param sinkState the state that the group pattern being converted should point to +* @param proceedState the state that the group pattern being converted should proceed to +* @param isOptional whether the group pattern being converted is optional +* @return the first state of the states of the group pattern +*/ + private State createGroupPatternState( + final GroupPattern groupPattern, + final State sinkState, + final State proceedState, + final boolean isOptional) { + final IterativeCondition trueFunction = BooleanConditions.trueFunction(); + + Pattern oldCurrentPattern = currentPattern; + Pattern oldFollowingPattern = followingPattern; + GroupPattern oldGroupPattern = currentGroupPattern; + try { --- End diff -- updated. > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071521#comment-16071521 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125173832 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -455,6 +548,76 @@ private void addStopStateToLooping(final State loopingState) { } /** +* Create all the states for the group pattern. +* +* @param groupPattern the group pattern to create the states for +* @param sinkState the state that the group pattern being converted should point to +* @param proceedState the state that the group pattern being converted should proceed to +* @param isOptional whether the group pattern being converted is optional +* @return the first state of the states of the group pattern +*/ + private State createGroupPatternState( + final GroupPattern groupPattern, + final State sinkState, + final State proceedState, + final boolean isOptional) { + final IterativeCondition trueFunction = BooleanConditions.trueFunction(); + + Pattern oldCurrentPattern = currentPattern; + Pattern oldFollowingPattern = followingPattern; + GroupPattern oldGroupPattern = currentGroupPattern; + try { + State lastSink = sinkState; + currentGroupPattern = groupPattern; + currentPattern = groupPattern.getRawPattern(); + lastSink = createMiddleStates(lastSink); + lastSink = convertPattern(lastSink); + if (isOptional) { + // for the first state of a group pattern, its PROCEED edge should point to + // the following state of that group pattern + lastSink.addProceed(proceedState, trueFunction); + } + return lastSink; + } finally { + currentPattern = oldCurrentPattern; + followingPattern = oldFollowingPattern; + currentGroupPattern = oldGroupPattern; + } + } + + /** +* Create the states for the group pattern as a looping one. +* +* @param groupPattern the group pattern to create the states for +* @param sinkState the state that the group pattern being converted should point to +* @return the first state of the states of the group pattern +*/ + private State createLoopingGroupPatternState( + final GroupPattern groupPattern, + final State sinkState) { + final IterativeCondition trueFunction = BooleanConditions.trueFunction(); + + Pattern oldCurrentPattern = currentPattern; + Pattern oldFollowingPattern = followingPattern; + GroupPattern oldGroupPattern = currentGroupPattern; + try { --- End diff -- updated. > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071519#comment-16071519 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125173760 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java --- @@ -366,8 +366,9 @@ public Quantifier getQuantifier() { checkIfNoNotPattern(); checkIfQuantifierApplied(); this.quantifier = Quantifier.times(quantifier.getConsumingStrategy()); - if (from == 0) { --- End diff -- Thanks for the suggestion, created PR: https://github.com/apache/flink/pull/4242 > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069900#comment-16069900 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125005796 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java --- @@ -366,8 +366,9 @@ public Quantifier getQuantifier() { checkIfNoNotPattern(); checkIfQuantifierApplied(); this.quantifier = Quantifier.times(quantifier.getConsumingStrategy()); - if (from == 0) { --- End diff -- Could you submit the change to TimesRange with another JIRA/PR? Let's make one change at a time. > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069902#comment-16069902 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r124994181 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java --- @@ -55,6 +55,11 @@ public void checkNameUniqueness(String name) { if (usedNames.contains(name)) { throw new MalformedPatternException("Duplicate pattern name: " + name + ". Names must be unique."); } + usedNames.add(name); + } + --- End diff -- javadoc missing > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069905#comment-16069905 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125009258 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java --- @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.GroupPattern; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link GroupPattern}. + */ +@SuppressWarnings("unchecked") +public class GroupITCase extends TestLogger { + + @Test + public void testGroupFollowedBy() { + List> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event b1 = new Event(42, "b", 3.0); + Event a2 = new Event(43, "a", 4.0); + Event b2 = new Event(44, "b", 5.0); + Event d = new Event(45, "d", 6.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(b1, 3)); + inputEvents.add(new StreamRecord<>(a2, 4)); + inputEvents.add(new StreamRecord<>(b2, 5)); + inputEvents.add(new StreamRecord<>(d, 6)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy(Pattern.begin("middle1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedBy("middle2").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + })).times(2).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(c, a1, b1, a2, b2, d) +
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069898#comment-16069898 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125005277 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java --- @@ -430,6 +431,54 @@ public Quantifier getQuantifier() { return this; } + /** +* Starts a new pattern sequence. The provided pattern is the initial pattern +* of the new sequence. +* +* @param group the pattern to begin with +* @return the first pattern of a pattern sequence +*/ + public static GroupPattern begin(Pattern group) { + return new GroupPattern<>(null, group); + } + + /** +* Appends a new pattern to the existing one. The new pattern enforces non-strict --- End diff -- I would update the docs, to put more stress that this method is used for appending a group pattern. > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069897#comment-16069897 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r124999092 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -455,6 +548,76 @@ private void addStopStateToLooping(final State loopingState) { } /** +* Create all the states for the group pattern. +* +* @param groupPattern the group pattern to create the states for +* @param sinkState the state that the group pattern being converted should point to +* @param proceedState the state that the group pattern being converted should proceed to +* @param isOptional whether the group pattern being converted is optional +* @return the first state of the states of the group pattern +*/ + private State createGroupPatternState( + final GroupPattern groupPattern, + final State sinkState, + final State proceedState, + final boolean isOptional) { + final IterativeCondition trueFunction = BooleanConditions.trueFunction(); + + Pattern oldCurrentPattern = currentPattern; + Pattern oldFollowingPattern = followingPattern; + GroupPattern oldGroupPattern = currentGroupPattern; + try { --- End diff -- What is the point of this `try` block? Why not: private State createGroupPatternState( final GroupPattern groupPattern, final State sinkState, final State proceedState, final boolean isOptional) { final IterativeCondition trueFunction = BooleanConditions.trueFunction(); Pattern oldCurrentPattern = currentPattern; Pattern oldFollowingPattern = followingPattern; GroupPattern oldGroupPattern = currentGroupPattern; State lastSink = sinkState; currentGroupPattern = groupPattern; currentPattern = groupPattern.getRawPattern(); lastSink = createMiddleStates(lastSink); lastSink = convertPattern(lastSink); if (isOptional) { // for the first state of a group pattern, its PROCEED edge should point to // the following state of that group pattern lastSink.addProceed(proceedState, trueFunction); } currentPattern = oldCurrentPattern; followingPattern = oldFollowingPattern; currentGroupPattern = oldGroupPattern; return lastSink; } > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069908#comment-16069908 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125010592 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java --- @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.GroupPattern; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link GroupPattern}. + */ +@SuppressWarnings("unchecked") +public class GroupITCase extends TestLogger { + + @Test + public void testGroupFollowedBy() { + List> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event b1 = new Event(42, "b", 3.0); + Event a2 = new Event(43, "a", 4.0); + Event b2 = new Event(44, "b", 5.0); + Event d = new Event(45, "d", 6.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(b1, 3)); + inputEvents.add(new StreamRecord<>(a2, 4)); + inputEvents.add(new StreamRecord<>(b2, 5)); + inputEvents.add(new StreamRecord<>(d, 6)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy(Pattern.begin("middle1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedBy("middle2").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + })).times(2).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(c, a1, b1, a2, b2, d) +
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069901#comment-16069901 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125012573 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java --- @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.GroupPattern; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link GroupPattern}. + */ +@SuppressWarnings("unchecked") +public class GroupITCase extends TestLogger { --- End diff -- I missed a test or two for `... followedByAny (...).oneOrMore ` also a test or two with `notNext` and `notFollow` after group patterns would be helpful I think. > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069907#comment-16069907 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125009695 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java --- @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.GroupPattern; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link GroupPattern}. + */ +@SuppressWarnings("unchecked") +public class GroupITCase extends TestLogger { + + @Test + public void testGroupFollowedBy() { + List> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event b1 = new Event(42, "b", 3.0); + Event a2 = new Event(43, "a", 4.0); + Event b2 = new Event(44, "b", 5.0); + Event d = new Event(45, "d", 6.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(b1, 3)); + inputEvents.add(new StreamRecord<>(a2, 4)); + inputEvents.add(new StreamRecord<>(b2, 5)); + inputEvents.add(new StreamRecord<>(d, 6)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy(Pattern.begin("middle1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedBy("middle2").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + })).times(2).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(c, a1, b1, a2, b2, d) +
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069904#comment-16069904 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125009025 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java --- @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.GroupPattern; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link GroupPattern}. + */ +@SuppressWarnings("unchecked") +public class GroupITCase extends TestLogger { + + @Test + public void testGroupFollowedBy() { --- End diff -- `testGroupFollowedBy` -> `testGroupFollowedByTimes` > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069909#comment-16069909 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125012236 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java --- @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.GroupPattern; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link GroupPattern}. + */ +@SuppressWarnings("unchecked") +public class GroupITCase extends TestLogger { + + @Test + public void testGroupFollowedBy() { + List> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event b1 = new Event(42, "b", 3.0); + Event a2 = new Event(43, "a", 4.0); + Event b2 = new Event(44, "b", 5.0); + Event d = new Event(45, "d", 6.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(b1, 3)); + inputEvents.add(new StreamRecord<>(a2, 4)); + inputEvents.add(new StreamRecord<>(b2, 5)); + inputEvents.add(new StreamRecord<>(d, 6)); + + Pattern pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy(Pattern.begin("middle1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedBy("middle2").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + })).times(2).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.>newArrayList( + Lists.newArrayList(c, a1, b1, a2, b2, d) +
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069906#comment-16069906 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125005984 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -455,6 +548,76 @@ private void addStopStateToLooping(final State loopingState) { } /** +* Create all the states for the group pattern. +* +* @param groupPattern the group pattern to create the states for +* @param sinkState the state that the group pattern being converted should point to +* @param proceedState the state that the group pattern being converted should proceed to +* @param isOptional whether the group pattern being converted is optional +* @return the first state of the states of the group pattern +*/ + private State createGroupPatternState( + final GroupPattern groupPattern, + final State sinkState, + final State proceedState, + final boolean isOptional) { + final IterativeCondition trueFunction = BooleanConditions.trueFunction(); + + Pattern oldCurrentPattern = currentPattern; + Pattern oldFollowingPattern = followingPattern; + GroupPattern oldGroupPattern = currentGroupPattern; + try { + State lastSink = sinkState; + currentGroupPattern = groupPattern; + currentPattern = groupPattern.getRawPattern(); + lastSink = createMiddleStates(lastSink); + lastSink = convertPattern(lastSink); + if (isOptional) { + // for the first state of a group pattern, its PROCEED edge should point to + // the following state of that group pattern + lastSink.addProceed(proceedState, trueFunction); + } + return lastSink; + } finally { + currentPattern = oldCurrentPattern; + followingPattern = oldFollowingPattern; + currentGroupPattern = oldGroupPattern; + } + } + + /** +* Create the states for the group pattern as a looping one. +* +* @param groupPattern the group pattern to create the states for +* @param sinkState the state that the group pattern being converted should point to +* @return the first state of the states of the group pattern +*/ + private State createLoopingGroupPatternState( + final GroupPattern groupPattern, + final State sinkState) { + final IterativeCondition trueFunction = BooleanConditions.trueFunction(); + + Pattern oldCurrentPattern = currentPattern; + Pattern oldFollowingPattern = followingPattern; + GroupPattern oldGroupPattern = currentGroupPattern; + try { --- End diff -- Same as above. Whyt `try`? > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069899#comment-16069899 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125005837 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java --- @@ -153,9 +153,8 @@ public int hashCode() { private final int to; private Times(int from, int to) { - Preconditions.checkArgument(from >= 0, "The from should be a non-negative number greater than or equal to 0."); + Preconditions.checkArgument(from > 0, "The from should be a positive number greater than 0."); --- End diff -- Move to another JIRA/PR > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069903#comment-16069903 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125005892 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java --- @@ -92,6 +92,57 @@ public boolean filter(Event value) throws Exception { } @Test + public void testTimesRangeFromZero() { --- End diff -- Move to another JIRA/PR > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068531#comment-16068531 ] ASF GitHub Bot commented on FLINK-6927: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/4153 Hi @dianfu and @dawidwys . I think that we should stick to the current API as: 1) a new API will raise serious backwards compatibility concerns, 2) people have already started using the current API and 3) there are two parallel efforts going on (SQL/ CEP standalone) and we should not block one on the other. If we agree on this, then this PR is ready for review @dawidwys . > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068013#comment-16068013 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4153 @dianfu sorry I have not reviewed it yet, but I do think this feature would benefit from reworking of the Pattern API I propose in [FLINK-3414](https://issues.apache.org/jira/browse/FLINK-3414). Instead of checking for head/tail of group pattern and caching them, we could more or less use current code for plain sequence creation that could return begin/end states. Then the code for joining groups would be much easier, cause it would operate on already translated sequences. Also the new API would make [FLINK-4641]](https://issues.apache.org/jira/browse/FLINK-4641) much easier I think. As it would require API rework I would really like to hear @kl0u opinion. If we agree though not to change the API I will go straight to reviewing this PR. > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065826#comment-16065826 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4153 rebase the code and @dawidwys @kl0u could you help to take a look at this PR? > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16058676#comment-16058676 ] ASF GitHub Bot commented on FLINK-6927: --- Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4153 @dawidwys @kl0u It will be great if you could take a look at this PR. > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6927) Support pattern group in CEP
[ https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16057193#comment-16057193 ] ASF GitHub Bot commented on FLINK-6927: --- GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/4153 [FLINK-6927] [cep] Support pattern group in CEP Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink FLINK-6927 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4153.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4153 commit 6b295b82ddcda542ef0832bdf8405c4bad975882 Author: Dian Fu Date: 2017-06-21T08:41:21Z [FLINK-6927] [cep] Support pattern group in CEP > Support pattern group in CEP > > > Key: FLINK-6927 > URL: https://issues.apache.org/jira/browse/FLINK-6927 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > We should add support for pattern group. This would enrich the set of > supported patterns. For example, users can write patterns like this with this > feature available: > {code} > A --> (B --> C.times(3)).optional() --> D > {code} > or > {code} > A --> (B --> C).times(3) --> D > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)