dianfu closed pull request #5080: [FLINK-8159] [cep] Add rich support for SelectWrapper and FlatSelectWrapper URL: https://github.com/apache/flink/pull/5080
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java index 4423bb1dd40..95225271b9e 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java @@ -20,7 +20,8 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.EventComparator; @@ -28,6 +29,7 @@ import org.apache.flink.cep.PatternFlatTimeoutFunction; import org.apache.flink.cep.nfa.AfterMatchSkipStrategy; import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.util.OutputTag; @@ -102,7 +104,7 @@ protected void processTimedOutSequences( * in one udf. */ @Internal - public static class FlatSelectWrapper<IN, OUT1, OUT2> implements Function { + public static class FlatSelectWrapper<IN, OUT1, OUT2> extends AbstractRichFunction { private static final long serialVersionUID = -8320546120157150202L; @@ -125,5 +127,25 @@ public FlatSelectWrapper( this.flatSelectFunction = flatSelectFunction; this.flatTimeoutFunction = flatTimeoutFunction; } + + @Override + public void open(Configuration parameters) throws Exception { + if (flatSelectFunction instanceof RichFunction) { + ((RichFunction) flatSelectFunction).open(parameters); + } + if (flatTimeoutFunction instanceof RichFunction) { + ((RichFunction) flatTimeoutFunction).open(parameters); + } + } + + @Override + public void close() throws Exception { + if (flatSelectFunction instanceof RichFunction) { + ((RichFunction) flatSelectFunction).close(); + } + if (flatTimeoutFunction instanceof RichFunction) { + ((RichFunction) flatTimeoutFunction).close(); + } + } } } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java index cb233a486ec..18a1454cf4c 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java @@ -19,7 +19,8 @@ package org.apache.flink.cep.operator; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.EventComparator; @@ -27,6 +28,7 @@ import org.apache.flink.cep.PatternTimeoutFunction; import org.apache.flink.cep.nfa.AfterMatchSkipStrategy; import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.OutputTag; @@ -54,8 +56,8 @@ public SelectTimeoutCepOperator( NFACompiler.NFAFactory<IN> nfaFactory, final EventComparator<IN> comparator, AfterMatchSkipStrategy skipStrategy, - PatternSelectFunction<IN, OUT1> flatSelectFunction, - PatternTimeoutFunction<IN, OUT2> flatTimeoutFunction, + PatternSelectFunction<IN, OUT1> selectFunction, + PatternTimeoutFunction<IN, OUT2> timeoutFunction, OutputTag<OUT2> outputTag) { super( inputSerializer, @@ -63,14 +65,14 @@ public SelectTimeoutCepOperator( nfaFactory, comparator, skipStrategy, - new SelectWrapper<>(flatSelectFunction, flatTimeoutFunction)); + new SelectWrapper<>(selectFunction, timeoutFunction)); this.timedOutOutputTag = outputTag; } @Override protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception { for (Map<String, List<IN>> match : matchingSequences) { - output.collect(new StreamRecord<>(getUserFunction().getFlatSelectFunction().select(match), timestamp)); + output.collect(new StreamRecord<>(getUserFunction().getSelectFunction().select(match), timestamp)); } } @@ -80,7 +82,7 @@ protected void processTimedOutSequences( for (Tuple2<Map<String, List<IN>>, Long> match : timedOutSequences) { output.collect(timedOutOutputTag, new StreamRecord<>( - getUserFunction().getFlatTimeoutFunction().timeout(match.f0, match.f1), + getUserFunction().getTimeoutFunction().timeout(match.f0, match.f1), timestamp)); } } @@ -93,26 +95,46 @@ protected void processTimedOutSequences( * @param <OUT2> Type of the timed out output elements */ @Internal - public static class SelectWrapper<IN, OUT1, OUT2> implements Function { + public static class SelectWrapper<IN, OUT1, OUT2> extends AbstractRichFunction { private static final long serialVersionUID = -8320546120157150202L; - private PatternSelectFunction<IN, OUT1> flatSelectFunction; - private PatternTimeoutFunction<IN, OUT2> flatTimeoutFunction; + private PatternSelectFunction<IN, OUT1> selectFunction; + private PatternTimeoutFunction<IN, OUT2> timeoutFunction; - PatternSelectFunction<IN, OUT1> getFlatSelectFunction() { - return flatSelectFunction; + PatternSelectFunction<IN, OUT1> getSelectFunction() { + return selectFunction; } - PatternTimeoutFunction<IN, OUT2> getFlatTimeoutFunction() { - return flatTimeoutFunction; + PatternTimeoutFunction<IN, OUT2> getTimeoutFunction() { + return timeoutFunction; } public SelectWrapper( - PatternSelectFunction<IN, OUT1> flatSelectFunction, - PatternTimeoutFunction<IN, OUT2> flatTimeoutFunction) { - this.flatSelectFunction = flatSelectFunction; - this.flatTimeoutFunction = flatTimeoutFunction; + PatternSelectFunction<IN, OUT1> selectFunction, + PatternTimeoutFunction<IN, OUT2> timeoutFunction) { + this.selectFunction = selectFunction; + this.timeoutFunction = timeoutFunction; + } + + @Override + public void open(Configuration parameters) throws Exception { + if (selectFunction instanceof RichFunction) { + ((RichFunction) selectFunction).open(parameters); + } + if (timeoutFunction instanceof RichFunction) { + ((RichFunction) timeoutFunction).open(parameters); + } + } + + @Override + public void close() throws Exception { + if (selectFunction instanceof RichFunction) { + ((RichFunction) selectFunction).close(); + } + if (timeoutFunction instanceof RichFunction) { + ((RichFunction) timeoutFunction).close(); + } } } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services