[ 
https://issues.apache.org/jira/browse/FLINK-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633679#comment-16633679
 ] 

ASF GitHub Bot commented on FLINK-10417:
----------------------------------------

dawidwys closed pull request #6758: [FLINK-10417][cep] Added option to throw 
exception on pattern variable mis…
URL: https://github.com/apache/flink/pull/6758
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index ad321bf71b5..d7f915fc62e 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -1385,6 +1385,23 @@ Pattern.begin("patternName", skipStrategy)
 </div>
 </div>
 
+{% warn Attention %} For SKIP_TO_FIRST/LAST there are two options how to 
handle cases when there are no elements mapped to
+the specified variable. By default a NO_SKIP strategy will be used in this 
case. The other option is to throw exception in such situation.
+One can enable this option by:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss()
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss()
+{% endhighlight %}
+</div>
+</div>
+
 ## Detecting Patterns
 
 After specifying the pattern sequence you are looking for, it is time to apply 
it to your input stream to detect
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
index 8151a124af4..f4448a35560 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
@@ -43,8 +43,8 @@
         * @param patternName the pattern name to skip to
         * @return the created AfterMatchSkipStrategy
         */
-       public static AfterMatchSkipStrategy skipToFirst(String patternName) {
-               return new SkipToFirstStrategy(patternName);
+       public static SkipToFirstStrategy skipToFirst(String patternName) {
+               return new SkipToFirstStrategy(patternName, false);
        }
 
        /**
@@ -53,8 +53,8 @@ public static AfterMatchSkipStrategy skipToFirst(String 
patternName) {
         * @param patternName the pattern name to skip to
         * @return the created AfterMatchSkipStrategy
         */
-       public static AfterMatchSkipStrategy skipToLast(String patternName) {
-               return new SkipToLastStrategy(patternName);
+       public static SkipToLastStrategy skipToLast(String patternName) {
+               return new SkipToLastStrategy(patternName, false);
        }
 
        /**
@@ -62,7 +62,7 @@ public static AfterMatchSkipStrategy skipToLast(String 
patternName) {
         *
         * @return the created AfterMatchSkipStrategy
         */
-       public static AfterMatchSkipStrategy skipPastLastEvent() {
+       public static SkipPastLastStrategy skipPastLastEvent() {
                return SkipPastLastStrategy.INSTANCE;
        }
 
@@ -71,7 +71,7 @@ public static AfterMatchSkipStrategy skipPastLastEvent() {
         *
         * @return the created AfterMatchSkipStrategy
         */
-       public static AfterMatchSkipStrategy noSkip() {
+       public static NoSkipStrategy noSkip() {
                return NoSkipStrategy.INSTANCE;
        }
 
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java
new file mode 100644
index 00000000000..5554151ccbd
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.aftermatch;
+
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+abstract class SkipToElementStrategy extends AfterMatchSkipStrategy {
+       private static final long serialVersionUID = 7127107527654629026L;
+       private final String patternName;
+       private final boolean shouldThrowException;
+
+       SkipToElementStrategy(String patternName, boolean shouldThrowException) 
{
+               this.patternName = checkNotNull(patternName);
+               this.shouldThrowException = shouldThrowException;
+       }
+
+       @Override
+       public boolean isSkipStrategy() {
+               return true;
+       }
+
+       @Override
+       protected boolean shouldPrune(EventId startEventID, EventId pruningId) {
+               return startEventID != null && 
startEventID.compareTo(pruningId) < 0;
+       }
+
+       @Override
+       protected EventId getPruningId(Collection<Map<String, List<EventId>>> 
match) {
+               EventId pruningId = null;
+               for (Map<String, List<EventId>> resultMap : match) {
+                       List<EventId> pruningPattern = 
resultMap.get(patternName);
+                       if (pruningPattern == null || pruningPattern.isEmpty()) 
{
+                               if (shouldThrowException) {
+                                       throw new 
FlinkRuntimeException(String.format(
+                                               "Could not skip to %s. No such 
element in the found match %s",
+                                               patternName,
+                                               resultMap));
+                               }
+                       } else {
+                               pruningId = max(pruningId, 
pruningPattern.get(getIndex(pruningPattern.size())));
+                       }
+               }
+
+               return pruningId;
+       }
+
+       @Override
+       public Optional<String> getPatternName() {
+               return Optional.of(patternName);
+       }
+
+       /**
+        * Tells which element from the list of events mapped to *PatternName* 
to use.
+        *
+        * @param size number of elements mapped to the *PatternName*
+        * @return index of event mapped to *PatternName* to use for pruning
+        */
+       abstract int getIndex(int size);
+
+       /**
+        * Enables throwing exception if no events mapped to the *PatternName*. 
If not enabled and no events were mapped,
+        * {@link NoSkipStrategy} will be used
+        */
+       public abstract SkipToElementStrategy throwExceptionOnMiss();
+}
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java
index 7d7be4a4b1e..e8befebe5fc 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java
@@ -18,59 +18,30 @@
 
 package org.apache.flink.cep.nfa.aftermatch;
 
-import org.apache.flink.cep.nfa.sharedbuffer.EventId;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * Discards every partial match that contains event of the match preceding the 
first of *PatternName*.
  */
-public class SkipToFirstStrategy extends AfterMatchSkipStrategy {
-
+public final class SkipToFirstStrategy extends SkipToElementStrategy {
        private static final long serialVersionUID = 7127107527654629026L;
-       private final String patternName;
 
-       SkipToFirstStrategy(String patternName) {
-               this.patternName = checkNotNull(patternName);
+       SkipToFirstStrategy(String patternName, boolean shouldThrowException) {
+               super(patternName, shouldThrowException);
        }
 
        @Override
-       public boolean isSkipStrategy() {
-               return true;
-       }
-
-       @Override
-       protected boolean shouldPrune(EventId startEventID, EventId pruningId) {
-               return startEventID != null && 
startEventID.compareTo(pruningId) < 0;
-       }
-
-       @Override
-       protected EventId getPruningId(Collection<Map<String, List<EventId>>> 
match) {
-               EventId pruniningId = null;
-               for (Map<String, List<EventId>> resultMap : match) {
-                       List<EventId> pruningPattern = 
resultMap.get(patternName);
-                       if (pruningPattern != null && 
!pruningPattern.isEmpty()) {
-                               pruniningId = max(pruniningId, 
pruningPattern.get(0));
-                       }
-               }
-
-               return pruniningId;
+       public SkipToElementStrategy throwExceptionOnMiss() {
+               return new SkipToFirstStrategy(getPatternName().get(), true);
        }
 
        @Override
-       public Optional<String> getPatternName() {
-               return Optional.of(patternName);
+       int getIndex(int size) {
+               return 0;
        }
 
        @Override
        public String toString() {
                return "SkipToFirstStrategy{" +
-                       "patternName='" + patternName + '\'' +
+                       "patternName='" + getPatternName().get() + '\'' +
                        '}';
        }
 }
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java
index 0f6c3eddcfb..3c33bd13f62 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java
@@ -18,59 +18,30 @@
 
 package org.apache.flink.cep.nfa.aftermatch;
 
-import org.apache.flink.cep.nfa.sharedbuffer.EventId;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * Discards every partial match that contains event of the match preceding the 
last of *PatternName*.
  */
-public class SkipToLastStrategy extends AfterMatchSkipStrategy {
+public final class SkipToLastStrategy extends SkipToElementStrategy {
        private static final long serialVersionUID = 7585116990619594531L;
-       private final String patternName;
 
-       SkipToLastStrategy(String patternName) {
-               this.patternName = checkNotNull(patternName);
-       }
-
-       @Override
-       public boolean isSkipStrategy() {
-               return true;
+       SkipToLastStrategy(String patternName, boolean shouldThrowException) {
+               super(patternName, shouldThrowException);
        }
 
        @Override
-       protected boolean shouldPrune(EventId startEventID, EventId pruningId) {
-               return startEventID != null && 
startEventID.compareTo(pruningId) < 0;
-       }
-
-       @Override
-       protected EventId getPruningId(Collection<Map<String, List<EventId>>> 
match) {
-               EventId pruningId = null;
-               for (Map<String, List<EventId>> resultMap : match) {
-                       List<EventId> pruningPattern = 
resultMap.get(patternName);
-
-                       if (pruningPattern != null && 
!pruningPattern.isEmpty()) {
-                               pruningId = max(pruningId, 
pruningPattern.get(pruningPattern.size() - 1));
-                       }
-               }
-
-               return pruningId;
+       public SkipToElementStrategy throwExceptionOnMiss() {
+               return new SkipToLastStrategy(getPatternName().get(), true);
        }
 
        @Override
-       public Optional<String> getPatternName() {
-               return Optional.of(patternName);
+       int getIndex(int size) {
+               return size - 1;
        }
 
        @Override
        public String toString() {
                return "SkipToLastStrategy{" +
-                       "patternName='" + patternName + '\'' +
+                       "patternName='" + getPatternName().get() + '\'' +
                        '}';
        }
 }
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
index 4462d103164..6681090d8b2 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
@@ -24,6 +24,7 @@
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -432,6 +433,74 @@ public boolean filter(Event value) throws Exception {
                ));
        }
 
+       @Test(expected = FlinkRuntimeException.class)
+       public void testSkipToFirstNonExistentPosition() throws Exception {
+               
MissedSkipTo.compute(AfterMatchSkipStrategy.skipToFirst("b").throwExceptionOnMiss());
+
+               //exception should be thrown
+       }
+
+       @Test
+       public void testSkipToFirstNonExistentPositionWithoutException() throws 
Exception {
+               List<List<Event>> resultingPatterns = 
MissedSkipTo.compute(AfterMatchSkipStrategy.skipToFirst("b"));
+
+               compareMaps(resultingPatterns, Collections.singletonList(
+                       Lists.newArrayList(MissedSkipTo.a, MissedSkipTo.c)
+               ));
+       }
+
+       @Test(expected = FlinkRuntimeException.class)
+       public void testSkipToLastNonExistentPosition() throws Exception {
+               
MissedSkipTo.compute(AfterMatchSkipStrategy.skipToLast("b").throwExceptionOnMiss());
+
+               //exception should be thrown
+       }
+
+       @Test
+       public void testSkipToLastNonExistentPositionWithoutException() throws 
Exception {
+               List<List<Event>> resultingPatterns = 
MissedSkipTo.compute(AfterMatchSkipStrategy.skipToFirst("b"));
+
+               compareMaps(resultingPatterns, Collections.singletonList(
+                       Lists.newArrayList(MissedSkipTo.a, MissedSkipTo.c)
+               ));
+       }
+
+       static class MissedSkipTo {
+               static Event a = new Event(1, "a", 0.0);
+               static Event c = new Event(4, "c", 0.0);
+
+               static List<List<Event>> compute(AfterMatchSkipStrategy 
skipStrategy) throws Exception {
+                       List<StreamRecord<Event>> streamEvents = new 
ArrayList<>();
+
+                       streamEvents.add(new StreamRecord<>(a));
+                       streamEvents.add(new StreamRecord<>(c));
+
+                       Pattern<Event, ?> pattern = 
Pattern.<Event>begin("a").where(
+                               new SimpleCondition<Event>() {
+
+                                       @Override
+                                       public boolean filter(Event value) 
throws Exception {
+                                               return 
value.getName().contains("a");
+                                       }
+                               }
+                       ).next("b").where(new SimpleCondition<Event>() {
+                               @Override
+                               public boolean filter(Event value) throws 
Exception {
+                                       return value.getName().contains("b");
+                               }
+                       }).oneOrMore().optional().consecutive()
+                               .next("c").where(new SimpleCondition<Event>() {
+                                       @Override
+                                       public boolean filter(Event value) 
throws Exception {
+                                               return 
value.getName().contains("c");
+                                       }
+                               });
+                       NFA<Event> nfa = compile(pattern, false);
+
+                       return feedNFA(streamEvents, nfa, skipStrategy);
+               }
+       }
+
        @Test
        public void testSkipToLastWithOneOrMore() throws Exception {
                List<StreamRecord<Event>> streamEvents = new ArrayList<>();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add option to throw exception on pattern variable miss with SKIP_TO_FIRST/LAST
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-10417
>                 URL: https://issues.apache.org/jira/browse/FLINK-10417
>             Project: Flink
>          Issue Type: Improvement
>          Components: CEP
>            Reporter: Dawid Wysakowicz
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to