[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15258427#comment-15258427 ] ASF GitHub Bot commented on FLINK-3708: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1905 > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15258406#comment-15258406 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1905#issuecomment-214806804 Failing test case is unrelated. Really good work @StefanRRichter. Will merge it. As a follow up, we should update the CEP documentation to also include Scala code examples. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15257838#comment-15257838 ] ASF GitHub Bot commented on FLINK-3708: --- Github user StefanRRichter commented on the pull request: https://github.com/apache/flink/pull/1905#issuecomment-214688703 All mentioned issues should be addressed now and the PR is ready. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15257831#comment-15257831 ] ASF GitHub Bot commented on FLINK-3708: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r61057258 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.cep +import org.apache.flink.cep.pattern.{Pattern => JPattern} +import org.apache.flink.streaming.api.windowing.time.Time + +import scala.reflect.ClassTag + +/** + * Base class for a pattern definition. + * + * A pattern definition is used by [[org.apache.flink.cep.nfa.compiler.NFACompiler]] to create + * a [[org.apache.flink.cep.nfa.NFA]]. + * + * {{{ + * Patternpattern = Pattern.begin("start") + * .next("middle").subtype(F.class) + * .followedBy("end").where(new MyFilterFunction()); + * } + * }}} + * + * @param jPattern Underlying Java API Pattern + * @tparam T Base type of the elements appearing in the pattern + * @tparam F Subtype of T to which the current pattern operator is constrained + */ +class Pattern[T: ClassTag, F <: T](jPattern: JPattern[T, F]) { + + private[flink] def getWrappedPattern = jPattern + + /** +* +* @return Name of the pattern operator +*/ + def getName: String = jPattern.getName --- End diff -- Ok, I truly wasn't aware of that. But it makes sense of course :-) > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254004#comment-15254004 ] ASF GitHub Bot commented on FLINK-3708: --- Github user StefanRRichter commented on the pull request: https://github.com/apache/flink/pull/1905#issuecomment-213443445 The current version should cover all of your feedback and could be pulled @tillrohrmann . > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15251547#comment-15251547 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60542216 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern, Pattern => JPattern} + +import _root_.scala.reflect.ClassTag + +package object pattern { + /** +* Utility method to wrap { @link org.apache.flink.cep.pattern.Pattern} and its subclasses +* for usage with the Scala API. +* +* @param javaPattern The underlying pattern from the Java API +* @tparam T Base type of the elements appearing in the pattern +* @tparam F Subtype of T to which the current pattern operator is constrained +* @return A pattern from the Scala API which wraps the pattern from the Java API +*/ + private[flink] def wrapPattern[ + T: ClassTag, F <: T : ClassTag](javaPattern: JPattern[T, F]) + : Pattern[T, F] = javaPattern match { +case f: JFollowedByPattern[T, F] => FollowedByPattern[T, F](f) +case p: JPattern[T, F] => Pattern[T, F](p) +case _ => null --- End diff -- Oh yes, you're totally right Stefan. But then we could maybe return an `Option[Pattern]` from the `wrapPattern` method. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15250334#comment-15250334 ] ASF GitHub Bot commented on FLINK-3708: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60452526 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern, Pattern => JPattern} + +import _root_.scala.reflect.ClassTag + +package object pattern { + /** +* Utility method to wrap { @link org.apache.flink.cep.pattern.Pattern} and its subclasses +* for usage with the Scala API. +* +* @param javaPattern The underlying pattern from the Java API +* @tparam T Base type of the elements appearing in the pattern +* @tparam F Subtype of T to which the current pattern operator is constrained +* @return A pattern from the Scala API which wraps the pattern from the Java API +*/ + private[flink] def wrapPattern[ + T: ClassTag, F <: T : ClassTag](javaPattern: JPattern[T, F]) + : Pattern[T, F] = javaPattern match { +case f: JFollowedByPattern[T, F] => FollowedByPattern[T, F](f) +case p: JPattern[T, F] => Pattern[T, F](p) +case _ => null --- End diff -- I am not sure if this case should trigger an exception, because the Java API is actually allowed to return null here in case there is no previous pattern. Our pattern will then wrap this as an undefined Option. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249910#comment-15249910 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1905#issuecomment-212429457 Thanks for your good contribution @StefanRRichter. I had some inline comments. It would be good to revisit the link tags in the ScalaDocs and replace them with the ScalaDoc link syntax. I haven't marked all occurrences in the code. Furthermore, it should be possible to completely throw out the `ClassTag` context bound. I have only marked the first occurrences where I've noticed it. The Scala code contained several lines which exceeded the maximum line length of 100 characters. The Scala style check plugin, which detects these style violations, is executed when you run `mvn verify`. Thus, it is always a good idea to run `mvn verify` once locally before pushing commits to a PR. Ping me once you've addressed my comments. Then I'll make sure that the PR is merged. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249889#comment-15249889 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60407197 --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPITCase.scala --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import java.util.{Map => JMap} + +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.cep.scala.pattern.Pattern +import org.apache.flink.core.fs.FileSystem +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.test.util.TestBaseUtils +import org.junit.{After, Before, Rule, Test} +import org.junit.rules.TemporaryFolder + + +@SuppressWarnings(Array("serial")) class CEPITCase extends ScalaStreamingMultipleProgramsTestBase { --- End diff -- I think we should not simply copy the Java `CEPITCase` here because it will test to a large extend what is already tested by the Java `CEPITCase`. What's more interesting to test is whether the Scala CEP can work with pure Scala types, such as a Scala tuple or a Scala collection. Furthermore, it would be interesting to see whether the `select` call is correctly forwarded to the Scala function. It should not be strictly necessary to write an IT case for that. You should be able to extract the mapper function from the `Transformation` and then simply pass a test value into it. If you see it in the Scala function, then the forwarding should be correct. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249870#comment-15249870 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60404836 --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.cep.pattern.SubtypeFilterFunction +import org.apache.flink.cep.scala._ +import org.junit.Assert._ +import org.junit.Test + +class PatternTest { --- End diff -- I think that we shouldn't simply copy the Java `PatternTest` here. It would be better to test that the Scala `Pattern` is correctly represented by the Java `Pattern`, because that representation is used in the end to compile the `NFA`. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249856#comment-15249856 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60403142 --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/Event.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.typeutils.TypeExtractor + +object Event { + def createTypeSerializer: TypeSerializer[Event] = { +val typeInformation: TypeInformation[Event] = TypeExtractor.createTypeInfo(classOf[Event]) +return typeInformation.createSerializer(new ExecutionConfig) + } +} + +class Event(var id: Int, var name: String, var price: Double) { --- End diff -- Why not simply reusing the Java `Event` class from the flink-cep test jar? > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249858#comment-15249858 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60403183 --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/StreamEvent.scala --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +object StreamEvent { + def of[V](event: V, timestamp: Long): StreamEvent[V] = { +new StreamEvent[V](event, timestamp) + } +} + +class StreamEvent[T](val event: T, val timestamp: Long) { --- End diff -- We could reuse the `StreamEvent` class from `flink-cep`. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249852#comment-15249852 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60402984 --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPITCase.scala --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import java.util.{Map => JMap} + +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.cep.scala.pattern.Pattern +import org.apache.flink.core.fs.FileSystem +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.test.util.TestBaseUtils +import org.junit.{After, Before, Rule, Test} +import org.junit.rules.TemporaryFolder + + +@SuppressWarnings(Array("serial")) class CEPITCase extends ScalaStreamingMultipleProgramsTestBase { + private var resultPath: String = null + private var expected: String = null + val _tempFolder = new TemporaryFolder + + @Rule + def tempFolder: TemporaryFolder = _tempFolder + + @Before + @throws[Exception] + def before { +resultPath = tempFolder.newFile.toURI.toString +expected = "" + } + + @After + @throws[Exception] + def after { +TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath) + } + + @Test + @throws[Exception] + def testSimplePatternCEP { +val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment +val input: DataStream[Event] = env.fromElements( + new Event(1, "barfoo", 1.0), + new Event(2, "start", 2.0), + new Event(3, "foobar", 3.0), + new SubEvent(4, "foo", 4.0, 1.0), + new Event(5, "middle", 5.0), + new SubEvent(6, "middle", 6.0, 2.0), + new SubEvent(7, "bar", 3.0, 3.0), + new Event(42, "42", 42.0), + new Event(8, "end", 1.0)) +val pattern: Pattern[Event, _] = Pattern.begin[Event]("start") + .where((value: Event) => value.name == "start") + .followedBy("middle") + .subtype(classOf[SubEvent]) + .where((value: SubEvent) => value.name == "middle") + .followedBy("end") + .where((value: Event) => value.name == "end") +val result: DataStream[String] = CEP.pattern(input, pattern) + .select((pattern: JMap[String, Event]) => { +val builder: StringBuilder = new StringBuilder +builder.append(pattern.get("start").id) + .append(",") + .append(pattern.get("middle").id) + .append(",") + .append(pattern.get("end").id) + .toString + }) +result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE) +expected = "2,6,8" +env.execute + } + + @Test + @throws[Exception] + def testSimpleKeyedPatternCEP { +val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(2) +val input: DataStream[Event] = env.fromElements( + new Event(1, "barfoo", 1.0), + new Event(2, "start", 2.0), + new Event(3, "start", 2.1), + new Event(3, "foobar", 3.0), + new SubEvent(4, "foo", 4.0, 1.0), + new SubEvent(3, "middle", 3.2, 1.0), + new Event(42, "start", 3.1), + new SubEvent(42, "middle", 3.3, 1.2), + new Event(5, "middle", 5.0), + new SubEvent(2, "middle", 6.0, 2.0), + new SubEvent(7, "bar", 3.0,
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249847#comment-15249847 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60402681 --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPITCase.scala --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import java.util.{Map => JMap} + +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.cep.scala.pattern.Pattern +import org.apache.flink.core.fs.FileSystem +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.test.util.TestBaseUtils +import org.junit.{After, Before, Rule, Test} +import org.junit.rules.TemporaryFolder + + +@SuppressWarnings(Array("serial")) class CEPITCase extends ScalaStreamingMultipleProgramsTestBase { --- End diff -- Just my personal preference: I would put the annotation in an extra line. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249842#comment-15249842 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60402439 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern, Pattern => JPattern} + +import _root_.scala.reflect.ClassTag + +package object pattern { + /** +* Utility method to wrap { @link org.apache.flink.cep.pattern.Pattern} and its subclasses +* for usage with the Scala API. +* +* @param javaPattern The underlying pattern from the Java API +* @tparam T Base type of the elements appearing in the pattern +* @tparam F Subtype of T to which the current pattern operator is constrained +* @return A pattern from the Scala API which wraps the pattern from the Java API +*/ + private[flink] def wrapPattern[ + T: ClassTag, F <: T : ClassTag](javaPattern: JPattern[T, F]) + : Pattern[T, F] = javaPattern match { +case f: JFollowedByPattern[T, F] => FollowedByPattern[T, F](f) +case p: JPattern[T, F] => Pattern[T, F](p) +case _ => null --- End diff -- `null` case should thrown an exception, because we cannot continue from here. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249841#comment-15249841 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60402382 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.cep +import org.apache.flink.cep.pattern.{Pattern => JPattern} +import org.apache.flink.streaming.api.windowing.time.Time + +import scala.reflect.ClassTag + +/** + * Base class for a pattern definition. + * + * A pattern definition is used by { @link org.apache.flink.cep.nfa.compiler.NFACompiler} to create + * a { @link NFA}. + * + * { @code + * Patternpattern = Pattern.begin("start") + * .next("middle").subtype(F.class) + * .followedBy("end").where(new MyFilterFunction()); + * } + * + * + * @param jPattern Underlying Java API Pattern + * @tparam T Base type of the elements appearing in the pattern + * @tparam F Subtype of T to which the current pattern operator is constrained + */ +class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) { + + private[flink] def getWrappedPattern = jPattern + + + /** +* +* @return Name of the pattern operator +*/ + def getName: String = jPattern.getName + + /** +* +* @return Window length in which the pattern match has to occur +*/ + def getWindowTime: Option[Time] = { +val time = jPattern.getWindowTime +if (time == null) None else Some(time) + } + + /** +* +* @return Filter condition for an event to be matched +*/ + def getFilterFunction: Option[FilterFunction[F]] = { +val filterFun = jPattern.getFilterFunction +if (filterFun == null) None else Some(filterFun) + } + + /** +* Applies a subtype constraint on the current pattern operator. This means that an event has +* to be of the given subtype in order to be matched. +* +* @param clazz Class of the subtype +* @tparam S Type of the subtype +* @return The same pattern operator with the new subtype constraint +*/ + def subtype[S <: F : ClassTag](clazz: Class[S]): Pattern[T, S] = { +jPattern.subtype(clazz) +this.asInstanceOf[Pattern[T, S]] + } + + /** +* Defines the maximum time interval for a matching pattern. This means that the time gap +* between first and the last event must not be longer than the window time. +* +* @param windowTime Time of the matching window +* @return The same pattern operator with the new window length +*/ + def within(windowTime: Time): Pattern[T, F] = { +jPattern.within(windowTime) +this + } + + /** +* Appends a new pattern operator to the existing one. The new pattern operator enforces strict +* temporal contiguity. This means that the whole pattern only matches if an event which matches +* this operator directly follows the preceding matching event. Thus, there cannot be any +* events in between two matching events. +* +* @param name Name of the new pattern operator +* @return A new pattern operator which is appended to this pattern operator +*/ + def next(name: String): Pattern[T, T] = { +wrapPattern(jPattern.next(name)) --- End diff -- Could be done more efficiently with `Pattern[T, T](jPattern.next(name))`. > Scala API for CEP > - > > Key: FLINK-3708 >
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249834#comment-15249834 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60401939 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.cep +import org.apache.flink.cep.pattern.{Pattern => JPattern} +import org.apache.flink.streaming.api.windowing.time.Time + +import scala.reflect.ClassTag + +/** + * Base class for a pattern definition. + * + * A pattern definition is used by { @link org.apache.flink.cep.nfa.compiler.NFACompiler} to create + * a { @link NFA}. + * + * { @code + * Patternpattern = Pattern.begin("start") + * .next("middle").subtype(F.class) + * .followedBy("end").where(new MyFilterFunction()); + * } + * + * + * @param jPattern Underlying Java API Pattern + * @tparam T Base type of the elements appearing in the pattern + * @tparam F Subtype of T to which the current pattern operator is constrained + */ +class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) { + + private[flink] def getWrappedPattern = jPattern + + + /** +* +* @return Name of the pattern operator +*/ + def getName: String = jPattern.getName + + /** +* +* @return Window length in which the pattern match has to occur +*/ + def getWindowTime: Option[Time] = { +val time = jPattern.getWindowTime +if (time == null) None else Some(time) + } + + /** +* +* @return Filter condition for an event to be matched +*/ + def getFilterFunction: Option[FilterFunction[F]] = { +val filterFun = jPattern.getFilterFunction +if (filterFun == null) None else Some(filterFun) + } + + /** +* Applies a subtype constraint on the current pattern operator. This means that an event has +* to be of the given subtype in order to be matched. +* +* @param clazz Class of the subtype +* @tparam S Type of the subtype +* @return The same pattern operator with the new subtype constraint +*/ + def subtype[S <: F : ClassTag](clazz: Class[S]): Pattern[T, S] = { +jPattern.subtype(clazz) +this.asInstanceOf[Pattern[T, S]] + } + + /** +* Defines the maximum time interval for a matching pattern. This means that the time gap +* between first and the last event must not be longer than the window time. +* +* @param windowTime Time of the matching window +* @return The same pattern operator with the new window length +*/ + def within(windowTime: Time): Pattern[T, F] = { +jPattern.within(windowTime) +this + } + + /** +* Appends a new pattern operator to the existing one. The new pattern operator enforces strict +* temporal contiguity. This means that the whole pattern only matches if an event which matches +* this operator directly follows the preceding matching event. Thus, there cannot be any +* events in between two matching events. +* +* @param name Name of the new pattern operator +* @return A new pattern operator which is appended to this pattern operator +*/ + def next(name: String): Pattern[T, T] = { +wrapPattern(jPattern.next(name)) + } + + /** +* Appends a new pattern operator to the existing one. The new pattern operator enforces +* non-strict temporal contiguity. This means that a matching
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249831#comment-15249831 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60401794 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.cep +import org.apache.flink.cep.pattern.{Pattern => JPattern} +import org.apache.flink.streaming.api.windowing.time.Time + +import scala.reflect.ClassTag + +/** + * Base class for a pattern definition. + * + * A pattern definition is used by { @link org.apache.flink.cep.nfa.compiler.NFACompiler} to create + * a { @link NFA}. + * + * { @code + * Patternpattern = Pattern.begin("start") + * .next("middle").subtype(F.class) + * .followedBy("end").where(new MyFilterFunction()); + * } + * + * + * @param jPattern Underlying Java API Pattern + * @tparam T Base type of the elements appearing in the pattern + * @tparam F Subtype of T to which the current pattern operator is constrained + */ +class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) { + + private[flink] def getWrappedPattern = jPattern + + + /** +* +* @return Name of the pattern operator +*/ + def getName: String = jPattern.getName + + /** +* +* @return Window length in which the pattern match has to occur +*/ + def getWindowTime: Option[Time] = { +val time = jPattern.getWindowTime +if (time == null) None else Some(time) + } + + /** +* +* @return Filter condition for an event to be matched +*/ + def getFilterFunction: Option[FilterFunction[F]] = { +val filterFun = jPattern.getFilterFunction +if (filterFun == null) None else Some(filterFun) + } + + /** +* Applies a subtype constraint on the current pattern operator. This means that an event has +* to be of the given subtype in order to be matched. +* +* @param clazz Class of the subtype +* @tparam S Type of the subtype +* @return The same pattern operator with the new subtype constraint +*/ + def subtype[S <: F : ClassTag](clazz: Class[S]): Pattern[T, S] = { +jPattern.subtype(clazz) +this.asInstanceOf[Pattern[T, S]] + } + + /** +* Defines the maximum time interval for a matching pattern. This means that the time gap +* between first and the last event must not be longer than the window time. +* +* @param windowTime Time of the matching window +* @return The same pattern operator with the new window length +*/ + def within(windowTime: Time): Pattern[T, F] = { +jPattern.within(windowTime) +this + } + + /** +* Appends a new pattern operator to the existing one. The new pattern operator enforces strict +* temporal contiguity. This means that the whole pattern only matches if an event which matches +* this operator directly follows the preceding matching event. Thus, there cannot be any +* events in between two matching events. +* +* @param name Name of the new pattern operator +* @return A new pattern operator which is appended to this pattern operator +*/ + def next(name: String): Pattern[T, T] = { +wrapPattern(jPattern.next(name)) + } + + /** +* Appends a new pattern operator to the existing one. The new pattern operator enforces +* non-strict temporal contiguity. This means that a matching
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249828#comment-15249828 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60401470 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.cep +import org.apache.flink.cep.pattern.{Pattern => JPattern} +import org.apache.flink.streaming.api.windowing.time.Time + +import scala.reflect.ClassTag + +/** + * Base class for a pattern definition. + * + * A pattern definition is used by { @link org.apache.flink.cep.nfa.compiler.NFACompiler} to create + * a { @link NFA}. + * + * { @code + * Patternpattern = Pattern.begin("start") + * .next("middle").subtype(F.class) + * .followedBy("end").where(new MyFilterFunction()); + * } + * --- End diff -- Code examples in ScalaDocs use `{{{ code }}}` > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249819#comment-15249819 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60400970 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/package.scala --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.ClosureCleaner +import org.apache.flink.cep.{PatternStream => JPatternStream} + +import _root_.scala.reflect.ClassTag + +package object scala { + + /** +* Utility method to wrap { @link org.apache.flink.cep.PatternStream} +* for usage with the Scala API. +* +* @param javaPatternStream The underlying pattern stream from the Java API +* @tparam T Type of the events +* @return A pattern stream from the Scala API which wraps a pattern stream from the Java API +*/ + private[flink] def wrapPatternStream[T: TypeInformation : ClassTag](javaPatternStream: JPatternStream[T]) + : scala.PatternStream[T] = { +javaPatternStream match { + case p: JPatternStream[T] => PatternStream[T](p) + case _ => null --- End diff -- I think we should throw an `Exception` if we encounter a `null` value here. The reason is that the subsequent operations would throw a `NullPointerException` otherwise. Moreover, there is no good strategy how to continue with a `null` pattern stream. The most scalaesque way would be: ``` Option(javaPatternStream) match { case Some(p) => PatternStream[T](p) case None => throw new Exception(...) } ``` > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249814#comment-15249814 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60400262 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import java.util.{Map => JMap} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction, PatternStream => JPatternStream} +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.util.Collector +import scala.reflect.ClassTag + +/** + * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected + * pattern sequences as a map of events associated with their names. The pattern is detected using a + * { @link org.apache.flink.cep.nfa.NFA}. In order to process the detected sequences, the user + * has to specify a { @link PatternSelectFunction} or a { @link PatternFlatSelectFunction}. + * + * @param jPatternStream Underlying pattern stream from Java API + * @tparam T Type of the events + */ +class PatternStream[T: TypeInformation : ClassTag](jPatternStream: JPatternStream[T]) { + + private[flink] def getWrappedPatternStream = jPatternStream + + /** +* Applies a select function to the detected pattern sequence. For each pattern sequence the +* provided { @link PatternSelectFunction} is called. The pattern select function can produce +* exactly one resulting element. +* +* @param patternSelectFunction The pattern select function which is called for each detected +* pattern sequence. +* @tparam R Type of the resulting elements +* @return { @link DataStream} which contains the resulting elements from the pattern select +* unction. +*/ + def select[R: TypeInformation : ClassTag](patternSelectFunction: PatternSelectFunction[T, R]): DataStream[R] = { +asScalaStream(jPatternStream.select(patternSelectFunction, implicitly[TypeInformation[R]])) + } + + /** +* Applies a flat select function to the detected pattern sequence. For each pattern sequence +* the provided { @link PatternFlatSelectFunction} is called. The pattern flat select function +* can produce an arbitrary number of resulting elements. +* +* @param patternFlatSelectFunction The pattern flat select function which is called for each +* detected pattern sequence. +* @tparam R Type of the resulting elements +* @return { @link DataStream} which contains the resulting elements from the pattern flat select +* function. +*/ + def flatSelect[R: TypeInformation : ClassTag](patternFlatSelectFunction: PatternFlatSelectFunction[T, R]): DataStream[R] = { +asScalaStream(jPatternStream.flatSelect(patternFlatSelectFunction, implicitly[TypeInformation[R]])) + } + + /** +* Applies a select function to the detected pattern sequence. For each pattern sequence the +* provided { @link PatternSelectFunction} is called. The pattern select function can produce +* exactly one resulting element. +* +* @param patternSelectFun The pattern select function which is called for each detected +* pattern sequence. +* @tparam R Type of the resulting elements +* @return { @link DataStream} which contains the resulting elements from the pattern select +* function. +*/ + def select[R: TypeInformation :
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249797#comment-15249797 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60399432 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import java.util.{Map => JMap} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction, PatternStream => JPatternStream} +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.util.Collector +import scala.reflect.ClassTag + +/** + * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected + * pattern sequences as a map of events associated with their names. The pattern is detected using a + * { @link org.apache.flink.cep.nfa.NFA}. In order to process the detected sequences, the user + * has to specify a { @link PatternSelectFunction} or a { @link PatternFlatSelectFunction}. + * + * @param jPatternStream Underlying pattern stream from Java API + * @tparam T Type of the events + */ +class PatternStream[T: TypeInformation : ClassTag](jPatternStream: JPatternStream[T]) { + + private[flink] def getWrappedPatternStream = jPatternStream + + /** +* Applies a select function to the detected pattern sequence. For each pattern sequence the +* provided { @link PatternSelectFunction} is called. The pattern select function can produce +* exactly one resulting element. +* +* @param patternSelectFunction The pattern select function which is called for each detected +* pattern sequence. +* @tparam R Type of the resulting elements +* @return { @link DataStream} which contains the resulting elements from the pattern select +* unction. +*/ + def select[R: TypeInformation : ClassTag](patternSelectFunction: PatternSelectFunction[T, R]): DataStream[R] = { +asScalaStream(jPatternStream.select(patternSelectFunction, implicitly[TypeInformation[R]])) + } + + /** +* Applies a flat select function to the detected pattern sequence. For each pattern sequence +* the provided { @link PatternFlatSelectFunction} is called. The pattern flat select function +* can produce an arbitrary number of resulting elements. +* +* @param patternFlatSelectFunction The pattern flat select function which is called for each +* detected pattern sequence. +* @tparam R Type of the resulting elements +* @return { @link DataStream} which contains the resulting elements from the pattern flat select +* function. +*/ + def flatSelect[R: TypeInformation : ClassTag](patternFlatSelectFunction: PatternFlatSelectFunction[T, R]): DataStream[R] = { +asScalaStream(jPatternStream.flatSelect(patternFlatSelectFunction, implicitly[TypeInformation[R]])) + } + + /** +* Applies a select function to the detected pattern sequence. For each pattern sequence the +* provided { @link PatternSelectFunction} is called. The pattern select function can produce +* exactly one resulting element. +* +* @param patternSelectFun The pattern select function which is called for each detected +* pattern sequence. +* @tparam R Type of the resulting elements +* @return { @link DataStream} which contains the resulting elements from the pattern select +* function. +*/ + def select[R: TypeInformation :
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249796#comment-15249796 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60399384 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import java.util.{Map => JMap} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction, PatternStream => JPatternStream} +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.util.Collector +import scala.reflect.ClassTag + +/** + * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected + * pattern sequences as a map of events associated with their names. The pattern is detected using a + * { @link org.apache.flink.cep.nfa.NFA}. In order to process the detected sequences, the user + * has to specify a { @link PatternSelectFunction} or a { @link PatternFlatSelectFunction}. + * + * @param jPatternStream Underlying pattern stream from Java API + * @tparam T Type of the events + */ +class PatternStream[T: TypeInformation : ClassTag](jPatternStream: JPatternStream[T]) { + + private[flink] def getWrappedPatternStream = jPatternStream + + /** +* Applies a select function to the detected pattern sequence. For each pattern sequence the +* provided { @link PatternSelectFunction} is called. The pattern select function can produce +* exactly one resulting element. +* +* @param patternSelectFunction The pattern select function which is called for each detected +* pattern sequence. +* @tparam R Type of the resulting elements +* @return { @link DataStream} which contains the resulting elements from the pattern select +* unction. +*/ + def select[R: TypeInformation : ClassTag](patternSelectFunction: PatternSelectFunction[T, R]): DataStream[R] = { --- End diff -- `ClassTag` not needed > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249792#comment-15249792 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60399207 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import java.util.{Map => JMap} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction, PatternStream => JPatternStream} +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.util.Collector +import scala.reflect.ClassTag + +/** + * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected + * pattern sequences as a map of events associated with their names. The pattern is detected using a + * { @link org.apache.flink.cep.nfa.NFA}. In order to process the detected sequences, the user + * has to specify a { @link PatternSelectFunction} or a { @link PatternFlatSelectFunction}. + * + * @param jPatternStream Underlying pattern stream from Java API + * @tparam T Type of the events + */ +class PatternStream[T: TypeInformation : ClassTag](jPatternStream: JPatternStream[T]) { --- End diff -- `ClassTag` not needed > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249791#comment-15249791 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60399173 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.cep.scala.pattern.Pattern +import org.apache.flink.cep.{CEP => JCEP} +import org.apache.flink.streaming.api.scala.DataStream + +import scala.reflect.ClassTag + +/** + * Utility method to transform a { @link DataStream} into a { @link PatternStream} to do CEP. + */ + +object CEP { + /** +* Transforms a { @link DataStream[T]} into a { @link PatternStream[T]} in the Scala API. +* See { @link org.apache.flink.cep.CEP} for a more detailed description how the underlying +* Java API works. --- End diff -- links in comments > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249788#comment-15249788 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60399038 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.cep.scala.pattern.Pattern +import org.apache.flink.cep.{CEP => JCEP} +import org.apache.flink.streaming.api.scala.DataStream + +import scala.reflect.ClassTag + +/** + * Utility method to transform a { @link DataStream} into a { @link PatternStream} to do CEP. --- End diff -- In ScalaDocs you comment using the syntax `[[DataStream]]` > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249785#comment-15249785 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60398887 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.cep.scala.pattern.Pattern +import org.apache.flink.cep.{CEP => JCEP} +import org.apache.flink.streaming.api.scala.DataStream + +import scala.reflect.ClassTag + +/** + * Utility method to transform a { @link DataStream} into a { @link PatternStream} to do CEP. + */ + +object CEP { + /** +* Transforms a { @link DataStream[T]} into a { @link PatternStream[T]} in the Scala API. +* See { @link org.apache.flink.cep.CEP} for a more detailed description how the underlying +* Java API works. +* +* @param input DataStream containing the input events +* @param pattern Pattern specification which shall be detected +* @tparam T Type of the input events +* @return Resulting pattern stream +*/ + def pattern[T: TypeInformation : ClassTag](input: DataStream[T], pattern: Pattern[T, _]): PatternStream[T] = { --- End diff -- `ClassTag` not needed > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249783#comment-15249783 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60398671 --- Diff: flink-libraries/flink-cep-scala/pom.xml --- @@ -0,0 +1,209 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + +org.apache.flink +flink-libraries +1.1-SNAPSHOT +.. + +flink-cep-scala_2.10 +flink-cep-scala +jar + + + +org.apache.flink +flink-scala_2.10 +${project.version} + provided + + +org.apache.flink +flink-clients_2.10 +${project.version} + provided + + +org.apache.flink +flink-cep_2.10 +${project.version} + + +org.apache.flink +flink-streaming-java_2.10 +${project.version} +provided + + +org.apache.flink +flink-streaming-scala_2.10 +${project.version} +provided + + +org.apache.flink +flink-tests_2.10 +${project.version} +test +test-jar + + +org.apache.flink +flink-test-utils_2.10 +${project.version} +test + + +org.apache.flink +flink-streaming-java_2.10 +${project.version} +test +test-jar + + +org.apache.flink +flink-streaming-scala_2.10 +${project.version} +test +test-jar + + + + + + + +net.alchim31.maven +scala-maven-plugin +3.1.4 + + + +scala-compile-first +process-resources + +compile + + + + + +scala-test-compile +process-test-resources + +testCompile + + + + + +-Xms128m +-Xmx512m + + + +org.scalamacros + paradise_${scala.version} +${scala.macros.version} + + + + + + + +org.apache.maven.plugins +maven-eclipse-plugin +2.8 + +true + + org.scala-ide.sdt.core.scalanature + org.eclipse.jdt.core.javanature + + + org.scala-ide.sdt.core.scalabuilder + + + org.scala-ide.sdt.launching.SCALA_CONTAINER + org.eclipse.jdt.launching.JRE_CONTAINER + + +org.scala-lang:scala-library +org.scala-lang:scala-compiler + + +**/*.scala +**/*.java + + + + + + +org.codehaus.mojo +build-helper-maven-plugin +1.7 + + + +add-source +
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249782#comment-15249782 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60398573 --- Diff: flink-libraries/flink-cep-scala/pom.xml --- @@ -0,0 +1,209 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + +org.apache.flink +flink-libraries +1.1-SNAPSHOT +.. + +flink-cep-scala_2.10 +flink-cep-scala +jar + + + +org.apache.flink +flink-scala_2.10 +${project.version} + provided + + +org.apache.flink +flink-clients_2.10 +${project.version} + provided + + +org.apache.flink +flink-cep_2.10 +${project.version} + + +org.apache.flink +flink-streaming-java_2.10 +${project.version} +provided + + +org.apache.flink +flink-streaming-scala_2.10 +${project.version} +provided + + +org.apache.flink +flink-tests_2.10 +${project.version} +test +test-jar + + +org.apache.flink +flink-test-utils_2.10 +${project.version} +test + + +org.apache.flink +flink-streaming-java_2.10 +${project.version} +test +test-jar + + +org.apache.flink +flink-streaming-scala_2.10 +${project.version} +test +test-jar + + + + + + + +net.alchim31.maven +scala-maven-plugin +3.1.4 + + + +scala-compile-first +process-resources + +compile + + + + + +scala-test-compile +process-test-resources + +testCompile + + + + + +-Xms128m +-Xmx512m + + + +org.scalamacros + paradise_${scala.version} +${scala.macros.version} + + + + + + + +org.apache.maven.plugins +maven-eclipse-plugin +2.8 + +true + + org.scala-ide.sdt.core.scalanature + org.eclipse.jdt.core.javanature + + + org.scala-ide.sdt.core.scalabuilder + + + org.scala-ide.sdt.launching.SCALA_CONTAINER + org.eclipse.jdt.launching.JRE_CONTAINER + + +org.scala-lang:scala-library +org.scala-lang:scala-compiler + + +**/*.scala +**/*.java + + + --- End diff -- I'm not sure how relevant this is anymore. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement >
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249780#comment-15249780 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60398456 --- Diff: flink-libraries/flink-cep-scala/pom.xml --- @@ -0,0 +1,209 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + +org.apache.flink +flink-libraries +1.1-SNAPSHOT +.. + +flink-cep-scala_2.10 +flink-cep-scala +jar + + + +org.apache.flink +flink-scala_2.10 +${project.version} + provided + + +org.apache.flink +flink-clients_2.10 +${project.version} + provided + + +org.apache.flink +flink-cep_2.10 +${project.version} + + +org.apache.flink +flink-streaming-java_2.10 +${project.version} +provided + + +org.apache.flink +flink-streaming-scala_2.10 +${project.version} +provided + + +org.apache.flink +flink-tests_2.10 +${project.version} +test +test-jar + + +org.apache.flink +flink-test-utils_2.10 +${project.version} +test + + +org.apache.flink +flink-streaming-java_2.10 +${project.version} +test +test-jar + + +org.apache.flink +flink-streaming-scala_2.10 +${project.version} +test +test-jar + + + + + + + +net.alchim31.maven +scala-maven-plugin +3.1.4 + + + +scala-compile-first +process-resources + +compile + + + + + +scala-test-compile +process-test-resources + +testCompile + + + + + +-Xms128m +-Xmx512m + + + +org.scalamacros + paradise_${scala.version} +${scala.macros.version} + + + + --- End diff -- I think you can replace this with ``` org.scala-tools maven-scala-plugin 2.15.2 compile testCompile src/main/scala src/test/scala -Xms64m -Xmx1024m ``` Since we don't have a mixed Java Scala project here. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249759#comment-15249759 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60397159 --- Diff: flink-libraries/flink-cep-scala/pom.xml --- @@ -0,0 +1,209 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + +org.apache.flink +flink-libraries +1.1-SNAPSHOT +.. + +flink-cep-scala_2.10 +flink-cep-scala +jar + + + +org.apache.flink +flink-scala_2.10 +${project.version} + provided + + +org.apache.flink +flink-clients_2.10 +${project.version} + provided + + +org.apache.flink +flink-cep_2.10 +${project.version} + + +org.apache.flink +flink-streaming-java_2.10 +${project.version} +provided + + +org.apache.flink +flink-streaming-scala_2.10 +${project.version} +provided + + +org.apache.flink +flink-tests_2.10 +${project.version} +test +test-jar + + +org.apache.flink +flink-test-utils_2.10 +${project.version} +test + + +org.apache.flink +flink-streaming-java_2.10 +${project.version} +test +test-jar + --- End diff -- Not needed > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249755#comment-15249755 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60397035 --- Diff: flink-libraries/flink-cep-scala/pom.xml --- @@ -0,0 +1,209 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + +org.apache.flink +flink-libraries +1.1-SNAPSHOT +.. + +flink-cep-scala_2.10 +flink-cep-scala +jar + + + +org.apache.flink +flink-scala_2.10 +${project.version} + provided + --- End diff -- Not needed > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249756#comment-15249756 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60397042 --- Diff: flink-libraries/flink-cep-scala/pom.xml --- @@ -0,0 +1,209 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + +org.apache.flink +flink-libraries +1.1-SNAPSHOT +.. + +flink-cep-scala_2.10 +flink-cep-scala +jar + + + +org.apache.flink +flink-scala_2.10 +${project.version} + provided + + +org.apache.flink +flink-clients_2.10 +${project.version} + provided + --- End diff -- Not needed > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249719#comment-15249719 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60394562 --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/StreamEvent.scala --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +object StreamEvent { + def of[V](event: V, timestamp: Long): StreamEvent[V] = { +new StreamEvent[V](event, timestamp) + } +} + +class StreamEvent[T](val event: T, val timestamp: Long) { --- End diff -- `StreamEvent` could be defined as a case class. Then you don't have to define the factory methods. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249717#comment-15249717 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60394471 --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/StreamEvent.scala --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +object StreamEvent { + def of[V](event: V, timestamp: Long): StreamEvent[V] = { +new StreamEvent[V](event, timestamp) + } +} + +class StreamEvent[T](val event: T, val timestamp: Long) { + --- End diff -- line breaks unnecessary > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249712#comment-15249712 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60394116 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java --- @@ -61,19 +61,36 @@ public DataStream select(final PatternSelectFunctionpatternSelectFunction) { // we have to extract the output type from the provided pattern selection function manually // because the TypeExtractor cannot do that if the method is wrapped in a MapFunction - TypeInformation outTypeInfo = TypeExtractor.getUnaryOperatorReturnType( - patternSelectFunction, - PatternSelectFunction.class, - 1, - -1, - inputType, - null, - false); + TypeInformation returnType = TypeExtractor.getUnaryOperatorReturnType( + patternSelectFunction, + PatternSelectFunction.class, + 1, + -1, + inputType, + null, + false); --- End diff -- Indentation > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249708#comment-15249708 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60393928 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java --- @@ -99,10 +116,26 @@ null, false); + return flatSelect(patternFlatSelectFunction, outTypeInfo); + } + + /** +* Applies a flat select function to the detected pattern sequence. For each pattern sequence +* the provided {@link PatternFlatSelectFunction} is called. The pattern flat select function +* can produce an arbitrary number of resulting elements. +* +* @param patternFlatSelectFunction The pattern flat select function which is called for each +* detected pattern sequence. +* @param Typ of the resulting elements +* @param outTypeInfo Explicit specification of output type. +* @return {@link DataStream} which contains the resulting elements from the pattern flat select +* function. +*/ + public DataStream flatSelect(final PatternFlatSelectFunctionpatternFlatSelectFunction, TypeInformation outTypeInfo) { return patternStream.flatMap( - new PatternFlatSelectMapper ( - patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction) - )).returns(outTypeInfo); + new PatternFlatSelectMapper ( + patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction) + )).returns(outTypeInfo); } --- End diff -- Indentation off in this file. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249609#comment-15249609 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60383831 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.cep +import org.apache.flink.cep.pattern.{Pattern => JPattern} +import org.apache.flink.streaming.api.windowing.time.Time + +import scala.reflect.ClassTag + +/** + * Base class for a pattern definition. + * + * A pattern definition is used by { @link org.apache.flink.cep.nfa.compiler.NFACompiler} to create + * a { @link NFA}. + * + * { @code + * Patternpattern = Pattern.begin("start") + * .next("middle").subtype(F.class) + * .followedBy("end").where(new MyFilterFunction()); + * } + * + * + * @param jPattern Underlying Java API Pattern + * @tparam T Base type of the elements appearing in the pattern + * @tparam F Subtype of T to which the current pattern operator is constrained + */ +class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) { + + private[flink] def getWrappedPattern = jPattern + + + /** +* +* @return Name of the pattern operator +*/ + def getName: String = jPattern.getName + + /** +* +* @return Window length in which the pattern match has to occur +*/ + def getWindowTime: Option[Time] = { +val time = jPattern.getWindowTime +if (time == null) None else Some(time) + } + + /** +* +* @return Filter condition for an event to be matched +*/ + def getFilterFunction: Option[FilterFunction[F]] = { +val filterFun = jPattern.getFilterFunction +if (filterFun == null) None else Some(filterFun) + } + + /** +* Applies a subtype constraint on the current pattern operator. This means that an event has +* to be of the given subtype in order to be matched. +* +* @param clazz Class of the subtype +* @tparam S Type of the subtype +* @return The same pattern operator with the new subtype constraint +*/ + def subtype[S <: F : ClassTag](clazz: Class[S]): Pattern[T, S] = { +jPattern.subtype(clazz) +this.asInstanceOf[Pattern[T, S]] + } + + /** +* Defines the maximum time interval for a matching pattern. This means that the time gap +* between first and the last event must not be longer than the window time. +* +* @param windowTime Time of the matching window +* @return The same pattern operator with the new window length +*/ + def within(windowTime: Time): Pattern[T, F] = { +jPattern.within(windowTime) +this + } + + /** +* Appends a new pattern operator to the existing one. The new pattern operator enforces strict +* temporal contiguity. This means that the whole pattern only matches if an event which matches +* this operator directly follows the preceding matching event. Thus, there cannot be any +* events in between two matching events. +* +* @param name Name of the new pattern operator +* @return A new pattern operator which is appended to this pattern operator +*/ + def next(name: String): Pattern[T, T] = { +wrapPattern(jPattern.next(name)) + } + + /** +* Appends a new pattern operator to the existing one. The new pattern operator enforces +* non-strict temporal contiguity. This means that a matching
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249608#comment-15249608 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60383704 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.cep +import org.apache.flink.cep.pattern.{Pattern => JPattern} +import org.apache.flink.streaming.api.windowing.time.Time + +import scala.reflect.ClassTag + +/** + * Base class for a pattern definition. + * + * A pattern definition is used by { @link org.apache.flink.cep.nfa.compiler.NFACompiler} to create + * a { @link NFA}. + * + * { @code + * Patternpattern = Pattern.begin("start") + * .next("middle").subtype(F.class) + * .followedBy("end").where(new MyFilterFunction()); + * } + * + * + * @param jPattern Underlying Java API Pattern + * @tparam T Base type of the elements appearing in the pattern + * @tparam F Subtype of T to which the current pattern operator is constrained + */ +class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) { + + private[flink] def getWrappedPattern = jPattern + + + /** +* +* @return Name of the pattern operator +*/ + def getName: String = jPattern.getName + + /** +* +* @return Window length in which the pattern match has to occur +*/ + def getWindowTime: Option[Time] = { +val time = jPattern.getWindowTime +if (time == null) None else Some(time) + } + + /** +* +* @return Filter condition for an event to be matched +*/ + def getFilterFunction: Option[FilterFunction[F]] = { +val filterFun = jPattern.getFilterFunction +if (filterFun == null) None else Some(filterFun) + } + + /** +* Applies a subtype constraint on the current pattern operator. This means that an event has +* to be of the given subtype in order to be matched. +* +* @param clazz Class of the subtype +* @tparam S Type of the subtype +* @return The same pattern operator with the new subtype constraint +*/ + def subtype[S <: F : ClassTag](clazz: Class[S]): Pattern[T, S] = { +jPattern.subtype(clazz) +this.asInstanceOf[Pattern[T, S]] + } + + /** +* Defines the maximum time interval for a matching pattern. This means that the time gap +* between first and the last event must not be longer than the window time. +* +* @param windowTime Time of the matching window +* @return The same pattern operator with the new window length +*/ + def within(windowTime: Time): Pattern[T, F] = { +jPattern.within(windowTime) +this + } + + /** +* Appends a new pattern operator to the existing one. The new pattern operator enforces strict +* temporal contiguity. This means that the whole pattern only matches if an event which matches +* this operator directly follows the preceding matching event. Thus, there cannot be any +* events in between two matching events. +* +* @param name Name of the new pattern operator +* @return A new pattern operator which is appended to this pattern operator +*/ + def next(name: String): Pattern[T, T] = { +wrapPattern(jPattern.next(name)) + } + + /** +* Appends a new pattern operator to the existing one. The new pattern operator enforces +* non-strict temporal contiguity. This means that a matching
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249605#comment-15249605 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60383389 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.cep +import org.apache.flink.cep.pattern.{Pattern => JPattern} +import org.apache.flink.streaming.api.windowing.time.Time + +import scala.reflect.ClassTag + +/** + * Base class for a pattern definition. + * + * A pattern definition is used by { @link org.apache.flink.cep.nfa.compiler.NFACompiler} to create + * a { @link NFA}. + * + * { @code + * Patternpattern = Pattern.begin("start") + * .next("middle").subtype(F.class) + * .followedBy("end").where(new MyFilterFunction()); + * } + * + * + * @param jPattern Underlying Java API Pattern + * @tparam T Base type of the elements appearing in the pattern + * @tparam F Subtype of T to which the current pattern operator is constrained + */ +class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) { + + private[flink] def getWrappedPattern = jPattern + + + /** +* +* @return Name of the pattern operator +*/ + def getName: String = jPattern.getName + + /** +* +* @return Window length in which the pattern match has to occur +*/ + def getWindowTime: Option[Time] = { +val time = jPattern.getWindowTime +if (time == null) None else Some(time) + } + + /** +* +* @return Filter condition for an event to be matched +*/ + def getFilterFunction: Option[FilterFunction[F]] = { +val filterFun = jPattern.getFilterFunction +if (filterFun == null) None else Some(filterFun) --- End diff -- Same here > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249599#comment-15249599 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60383097 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.cep +import org.apache.flink.cep.pattern.{Pattern => JPattern} +import org.apache.flink.streaming.api.windowing.time.Time + +import scala.reflect.ClassTag + +/** + * Base class for a pattern definition. + * + * A pattern definition is used by { @link org.apache.flink.cep.nfa.compiler.NFACompiler} to create + * a { @link NFA}. + * + * { @code + * Patternpattern = Pattern.begin("start") + * .next("middle").subtype(F.class) + * .followedBy("end").where(new MyFilterFunction()); + * } + * + * + * @param jPattern Underlying Java API Pattern + * @tparam T Base type of the elements appearing in the pattern + * @tparam F Subtype of T to which the current pattern operator is constrained + */ +class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) { + + private[flink] def getWrappedPattern = jPattern + + + /** +* +* @return Name of the pattern operator +*/ + def getName: String = jPattern.getName + + /** +* +* @return Window length in which the pattern match has to occur +*/ + def getWindowTime: Option[Time] = { +val time = jPattern.getWindowTime +if (time == null) None else Some(time) --- End diff -- It's easier to simply write `Option(jPattern.getWindowTime())` > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249598#comment-15249598 ] ASF GitHub Bot commented on FLINK-3708: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60382954 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.cep +import org.apache.flink.cep.pattern.{Pattern => JPattern} +import org.apache.flink.streaming.api.windowing.time.Time + +import scala.reflect.ClassTag + +/** + * Base class for a pattern definition. + * + * A pattern definition is used by { @link org.apache.flink.cep.nfa.compiler.NFACompiler} to create + * a { @link NFA}. + * + * { @code + * Patternpattern = Pattern.begin("start") + * .next("middle").subtype(F.class) + * .followedBy("end").where(new MyFilterFunction()); + * } + * + * + * @param jPattern Underlying Java API Pattern + * @tparam T Base type of the elements appearing in the pattern + * @tparam F Subtype of T to which the current pattern operator is constrained + */ +class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) { + + private[flink] def getWrappedPattern = jPattern + + --- End diff -- two line break > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249561#comment-15249561 ] Aljoscha Krettek commented on FLINK-3708: - I think it was required because we cannot use the dependency management for ASM because it is shaded. Some weird problem that went away when explicitly including it in gelly... > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247487#comment-15247487 ] ASF GitHub Bot commented on FLINK-3708: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1905#issuecomment-211843938 It looks like it was added as part of [FLINK-3136](https://issues.apache.org/jira/browse/FLINK-3136). > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247478#comment-15247478 ] ASF GitHub Bot commented on FLINK-3708: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1905#issuecomment-211840791 Does anybody know why we needed to add the asm dependency for gelly scala? @tillrohrmann @vasia ? > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247413#comment-15247413 ] ASF GitHub Bot commented on FLINK-3708: --- Github user StefanRRichter commented on the pull request: https://github.com/apache/flink/pull/1905#issuecomment-211819580 Thanks Ufuk! I just saw that the build actually failed and it seems to me that the problem is indeed related to removing the dependency to asm from the pom.xml as Robert suggested? So I could either add the dependency again or do you see any option to solve this otherwise? There is one remaining question that I already started to discuss with Till about the select functions in PatternStream. The Java API wants to call them with java.util.Map as parameters, but I don't think we want Java classes in our Scala API. Currently I see two options how to change this without touching the Java API: 1) Use Scala mutable.Map and automatic conversion. 2) Use Scala immutable.Map and explicit conversion. From how I interpret the actual use of the Map in this function, the semantics are actually that of an immutable map. If the impact of the conversion is not an issue, I have already implemented version 2 (it is currently commented out in favor of a version that still uses Java Map. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247350#comment-15247350 ] ASF GitHub Bot commented on FLINK-3708: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1905#issuecomment-211784417 Thank you for this great contributio! I think that CEP and a concise Scala API go very well together. :-) I'm not a Scala expert, but I'll also have a look later. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15246128#comment-15246128 ] ASF GitHub Bot commented on FLINK-3708: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60102619 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala --- @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern, Pattern => JPattern} + +import scala.reflect.ClassTag + + +object FollowedByPattern { + def apply[T : ClassTag, F <: T : ClassTag] + (jfbPattern: JFollowedByPattern[T, F]) = new FollowedByPattern[T, F](jfbPattern) +} + +/** + * Created by stefan on 16.04.16. --- End diff -- Can you remove this? > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15246165#comment-15246165 ] ASF GitHub Bot commented on FLINK-3708: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60104038 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala --- @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern, Pattern => JPattern} + +import scala.reflect.ClassTag + + +object FollowedByPattern { + def apply[T : ClassTag, F <: T : ClassTag] + (jfbPattern: JFollowedByPattern[T, F]) = new FollowedByPattern[T, F](jfbPattern) +} + +/** + * Created by stefan on 16.04.16. --- End diff -- Sure. Thought I already removed all of it. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15246163#comment-15246163 ] ASF GitHub Bot commented on FLINK-3708: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60103959 --- Diff: flink-libraries/flink-cep-scala/pom.xml --- @@ -0,0 +1,216 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + +org.apache.flink +flink-libraries +1.1-SNAPSHOT +.. + +flink-cep-scala_2.10 +flink-cep-scala +jar + + + +org.apache.flink +flink-scala_2.10 +${project.version} + provided + + --- End diff -- I initially took the pom.xml from gelly-scala as blueprint. I adopted the line from there; there was not any particular problem for me. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15246119#comment-15246119 ] ASF GitHub Bot commented on FLINK-3708: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60102211 --- Diff: flink-libraries/flink-cep-scala/pom.xml --- @@ -0,0 +1,216 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + +org.apache.flink +flink-libraries +1.1-SNAPSHOT +.. + +flink-cep-scala_2.10 +flink-cep-scala +jar + + + +org.apache.flink +flink-scala_2.10 +${project.version} + provided + + --- End diff -- What exactly was the problem here? Ideally the module inherits the shading plugin from the parent and everything is shaded correctly. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3708) Scala API for CEP
[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15245917#comment-15245917 ] ASF GitHub Bot commented on FLINK-3708: --- GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/1905 [FLINK-3708] Scala API for CEP (initial). This module flink-cep-scala adds a Scala pendant for the Java CEP API to Flink. I created Scala classes for Pattern and PatternStream, helper classes, as well as corresponding tests. PatternStream in flink-cep is extended to obtain explicit TypeInformation from Scala code. Pattern in flink-cep is slightly modified in the type parameters for member previous. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink dev-cep-scala Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1905.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1905 commit 306210da82c94f6770e22690799a3f1af9a877e7 Author: Stefan RichterDate: 2016-04-18T11:10:09Z Preparations in Pattern and PatternStream of flink-cep to support flink-cep-scala functionality. commit 731d96ae78c4dc1b7125966900886225374a3eb5 Author: Stefan Richter Date: 2016-04-18T11:10:35Z Initial commit for flink-cep-scala API. commit 7da7c14787226d9aa5914a138f632b8b6d5f1335 Author: Stefan Richter Date: 2016-04-18T13:48:51Z [FLINK-3708] Added missing test dependency to pom. commit 1468bd91aea7d2a757ddeea8b8506b5ad9f3d289 Author: Stefan Richter Date: 2016-04-18T14:50:53Z [FLINK-3708] Pattern in Scala API now uses Option to shield users against null values from the Java API commit 9ad8719c5b8767ec2fb0425f55cc7df105e36bcb Author: Stefan Richter Date: 2016-04-18T16:04:35Z [FLINK-3708] Added test-jar for build phase to pom. > Scala API for CEP > - > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)