[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15258427#comment-15258427
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1905


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15258406#comment-15258406
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1905#issuecomment-214806804
  
Failing test case is unrelated. Really good work @StefanRRichter. Will 
merge it. 

As a follow up, we should update the CEP documentation to also include 
Scala code examples.


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15257838#comment-15257838
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user StefanRRichter commented on the pull request:

https://github.com/apache/flink/pull/1905#issuecomment-214688703
  
All mentioned issues should be addressed now and the PR is ready.


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15257831#comment-15257831
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r61057258
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.cep
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+import scala.reflect.ClassTag
+
+/**
+  * Base class for a pattern definition.
+  *
+  * A pattern definition is used by 
[[org.apache.flink.cep.nfa.compiler.NFACompiler]] to create
+  * a [[org.apache.flink.cep.nfa.NFA]].
+  *
+  * {{{
+  * Pattern pattern = Pattern.begin("start")
+  * .next("middle").subtype(F.class)
+  * .followedBy("end").where(new MyFilterFunction());
+  * }
+  * }}}
+  *
+  * @param jPattern Underlying Java API Pattern
+  * @tparam T Base type of the elements appearing in the pattern
+  * @tparam F Subtype of T to which the current pattern operator is 
constrained
+  */
+class Pattern[T: ClassTag, F <: T](jPattern: JPattern[T, F]) {
+
+  private[flink] def getWrappedPattern = jPattern
+
+  /**
+*
+* @return Name of the pattern operator
+*/
+  def getName: String = jPattern.getName
--- End diff --

Ok, I truly wasn't aware of that. But it makes sense of course :-)


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254004#comment-15254004
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user StefanRRichter commented on the pull request:

https://github.com/apache/flink/pull/1905#issuecomment-213443445
  
The current version should cover all of your feedback and could be pulled 
@tillrohrmann .


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15251547#comment-15251547
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60542216
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import org.apache.flink.cep.pattern.{FollowedByPattern => 
JFollowedByPattern, Pattern => JPattern}
+
+import _root_.scala.reflect.ClassTag
+
+package object pattern {
+  /**
+* Utility method to wrap { @link org.apache.flink.cep.pattern.Pattern} 
and its subclasses
+* for usage with the Scala API.
+*
+* @param javaPattern The underlying pattern from the Java API
+* @tparam T Base type of the elements appearing in the pattern
+* @tparam F Subtype of T to which the current pattern operator is 
constrained
+* @return A pattern from the Scala API which wraps the pattern from 
the Java API
+*/
+  private[flink] def wrapPattern[
+  T: ClassTag, F <: T : ClassTag](javaPattern: JPattern[T, F])
+  : Pattern[T, F] = javaPattern match {
+case f: JFollowedByPattern[T, F] => FollowedByPattern[T, F](f)
+case p: JPattern[T, F] => Pattern[T, F](p)
+case _ => null
--- End diff --

Oh yes, you're totally right Stefan. But then we could maybe return an 
`Option[Pattern]` from the `wrapPattern` method.


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15250334#comment-15250334
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60452526
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import org.apache.flink.cep.pattern.{FollowedByPattern => 
JFollowedByPattern, Pattern => JPattern}
+
+import _root_.scala.reflect.ClassTag
+
+package object pattern {
+  /**
+* Utility method to wrap { @link org.apache.flink.cep.pattern.Pattern} 
and its subclasses
+* for usage with the Scala API.
+*
+* @param javaPattern The underlying pattern from the Java API
+* @tparam T Base type of the elements appearing in the pattern
+* @tparam F Subtype of T to which the current pattern operator is 
constrained
+* @return A pattern from the Scala API which wraps the pattern from 
the Java API
+*/
+  private[flink] def wrapPattern[
+  T: ClassTag, F <: T : ClassTag](javaPattern: JPattern[T, F])
+  : Pattern[T, F] = javaPattern match {
+case f: JFollowedByPattern[T, F] => FollowedByPattern[T, F](f)
+case p: JPattern[T, F] => Pattern[T, F](p)
+case _ => null
--- End diff --

I am not sure if this case should trigger an exception, because the Java 
API is actually allowed to return null here in case there is no previous 
pattern. Our pattern will then wrap this as an undefined Option.


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249910#comment-15249910
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1905#issuecomment-212429457
  
Thanks for your good contribution @StefanRRichter. I had some inline 
comments.

It would be good to revisit the link tags in the ScalaDocs and replace them 
with the ScalaDoc link syntax. I haven't marked all occurrences in the code. 

Furthermore, it should be possible to completely throw out the `ClassTag` 
context bound. I have only marked the first occurrences where I've noticed it.

The Scala code contained several lines which exceeded the maximum line 
length of 100 characters. The Scala style check plugin, which detects these 
style violations, is executed when you run `mvn verify`. Thus, it is always a 
good idea to run `mvn verify` once locally before pushing commits to a PR.

Ping me once you've addressed my comments. Then I'll make sure that the PR 
is merged.


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249889#comment-15249889
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60407197
  
--- Diff: 
flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPITCase.scala
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import java.util.{Map => JMap}
+
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.cep.scala.pattern.Pattern
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.{After, Before, Rule, Test}
+import org.junit.rules.TemporaryFolder
+
+
+@SuppressWarnings(Array("serial")) class CEPITCase extends 
ScalaStreamingMultipleProgramsTestBase {
--- End diff --

I think we should not simply copy the Java `CEPITCase` here because it will 
test to a large extend what is already tested by the Java `CEPITCase`.

What's more interesting to test is whether the Scala CEP can work with pure 
Scala types, such as a Scala tuple or a Scala collection. 

Furthermore, it would be interesting to see whether the `select` call is 
correctly forwarded to the Scala function. It should not be strictly necessary 
to write an IT case for that. You should be able to extract the mapper function 
from the `Transformation` and then simply pass a test value into it. If you see 
it in the Scala function, then the forwarding should be correct.


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249870#comment-15249870
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60404836
  
--- Diff: 
flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.cep.pattern.SubtypeFilterFunction
+import org.apache.flink.cep.scala._
+import org.junit.Assert._
+import org.junit.Test
+
+class PatternTest {
--- End diff --

I think that we shouldn't simply copy the Java `PatternTest` here.

It would be better to test that the Scala `Pattern` is correctly 
represented by the Java `Pattern`, because that representation is used in the 
end to compile the `NFA`.


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249856#comment-15249856
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60403142
  
--- Diff: 
flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/Event.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.typeutils.TypeExtractor
+
+object Event {
+  def createTypeSerializer: TypeSerializer[Event] = {
+val typeInformation: TypeInformation[Event] = 
TypeExtractor.createTypeInfo(classOf[Event])
+return typeInformation.createSerializer(new ExecutionConfig)
+  }
+}
+
+class Event(var id: Int, var name: String, var price: Double) {
--- End diff --

Why not simply reusing the Java `Event` class from the flink-cep test jar?


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249858#comment-15249858
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60403183
  
--- Diff: 
flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/StreamEvent.scala
 ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+object StreamEvent {
+  def of[V](event: V, timestamp: Long): StreamEvent[V] = {
+new StreamEvent[V](event, timestamp)
+  }
+}
+
+class StreamEvent[T](val event: T, val timestamp: Long) {
--- End diff --

We could reuse the `StreamEvent` class from `flink-cep`.


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249852#comment-15249852
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60402984
  
--- Diff: 
flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPITCase.scala
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import java.util.{Map => JMap}
+
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.cep.scala.pattern.Pattern
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.{After, Before, Rule, Test}
+import org.junit.rules.TemporaryFolder
+
+
+@SuppressWarnings(Array("serial")) class CEPITCase extends 
ScalaStreamingMultipleProgramsTestBase {
+  private var resultPath: String = null
+  private var expected: String = null
+  val _tempFolder = new TemporaryFolder
+
+  @Rule
+  def tempFolder: TemporaryFolder = _tempFolder
+
+  @Before
+  @throws[Exception]
+  def before {
+resultPath = tempFolder.newFile.toURI.toString
+expected = ""
+  }
+
+  @After
+  @throws[Exception]
+  def after {
+TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
+  }
+
+  @Test
+  @throws[Exception]
+  def testSimplePatternCEP {
+val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
+val input: DataStream[Event] = env.fromElements(
+  new Event(1, "barfoo", 1.0),
+  new Event(2, "start", 2.0),
+  new Event(3, "foobar", 3.0),
+  new SubEvent(4, "foo", 4.0, 1.0),
+  new Event(5, "middle", 5.0),
+  new SubEvent(6, "middle", 6.0, 2.0),
+  new SubEvent(7, "bar", 3.0, 3.0),
+  new Event(42, "42", 42.0),
+  new Event(8, "end", 1.0))
+val pattern: Pattern[Event, _] = Pattern.begin[Event]("start")
+  .where((value: Event) => value.name == "start")
+  .followedBy("middle")
+  .subtype(classOf[SubEvent])
+  .where((value: SubEvent) => value.name == "middle")
+  .followedBy("end")
+  .where((value: Event) => value.name == "end")
+val result: DataStream[String] = CEP.pattern(input, pattern)
+  .select((pattern: JMap[String, Event]) => {
+val builder: StringBuilder = new StringBuilder
+builder.append(pattern.get("start").id)
+  .append(",")
+  .append(pattern.get("middle").id)
+  .append(",")
+  .append(pattern.get("end").id)
+  .toString
+  })
+result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE)
+expected = "2,6,8"
+env.execute
+  }
+
+  @Test
+  @throws[Exception]
+  def testSimpleKeyedPatternCEP {
+val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(2)
+val input: DataStream[Event] = env.fromElements(
+  new Event(1, "barfoo", 1.0),
+  new Event(2, "start", 2.0),
+  new Event(3, "start", 2.1),
+  new Event(3, "foobar", 3.0),
+  new SubEvent(4, "foo", 4.0, 1.0),
+  new SubEvent(3, "middle", 3.2, 1.0),
+  new Event(42, "start", 3.1),
+  new SubEvent(42, "middle", 3.3, 1.2),
+  new Event(5, "middle", 5.0),
+  new SubEvent(2, "middle", 6.0, 2.0),
+  new SubEvent(7, "bar", 3.0, 

[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249847#comment-15249847
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60402681
  
--- Diff: 
flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPITCase.scala
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import java.util.{Map => JMap}
+
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.cep.scala.pattern.Pattern
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.{After, Before, Rule, Test}
+import org.junit.rules.TemporaryFolder
+
+
+@SuppressWarnings(Array("serial")) class CEPITCase extends 
ScalaStreamingMultipleProgramsTestBase {
--- End diff --

Just my personal preference: I would put the annotation in an extra line.


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249842#comment-15249842
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60402439
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import org.apache.flink.cep.pattern.{FollowedByPattern => 
JFollowedByPattern, Pattern => JPattern}
+
+import _root_.scala.reflect.ClassTag
+
+package object pattern {
+  /**
+* Utility method to wrap { @link org.apache.flink.cep.pattern.Pattern} 
and its subclasses
+* for usage with the Scala API.
+*
+* @param javaPattern The underlying pattern from the Java API
+* @tparam T Base type of the elements appearing in the pattern
+* @tparam F Subtype of T to which the current pattern operator is 
constrained
+* @return A pattern from the Scala API which wraps the pattern from 
the Java API
+*/
+  private[flink] def wrapPattern[
+  T: ClassTag, F <: T : ClassTag](javaPattern: JPattern[T, F])
+  : Pattern[T, F] = javaPattern match {
+case f: JFollowedByPattern[T, F] => FollowedByPattern[T, F](f)
+case p: JPattern[T, F] => Pattern[T, F](p)
+case _ => null
--- End diff --

`null` case should thrown an exception, because we cannot continue from 
here.


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249841#comment-15249841
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60402382
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.cep
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+import scala.reflect.ClassTag
+
+/**
+  * Base class for a pattern definition.
+  * 
+  * A pattern definition is used by { @link 
org.apache.flink.cep.nfa.compiler.NFACompiler} to create
+  * a { @link NFA}.
+  *
+  * { @code
+  * Pattern pattern = Pattern.begin("start")
+  * .next("middle").subtype(F.class)
+  * .followedBy("end").where(new MyFilterFunction());
+  * }
+  * 
+  *
+  * @param jPattern Underlying Java API Pattern
+  * @tparam T Base type of the elements appearing in the pattern
+  * @tparam F Subtype of T to which the current pattern operator is 
constrained
+  */
+class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) {
+
+  private[flink] def getWrappedPattern = jPattern
+
+
+  /**
+*
+* @return Name of the pattern operator
+*/
+  def getName: String = jPattern.getName
+
+  /**
+*
+* @return Window length in which the pattern match has to occur
+*/
+  def getWindowTime: Option[Time] = {
+val time = jPattern.getWindowTime
+if (time == null) None else Some(time)
+  }
+
+  /**
+*
+* @return Filter condition for an event to be matched
+*/
+  def getFilterFunction: Option[FilterFunction[F]] = {
+val filterFun = jPattern.getFilterFunction
+if (filterFun == null) None else Some(filterFun)
+  }
+
+  /**
+* Applies a subtype constraint on the current pattern operator. This 
means that an event has
+* to be of the given subtype in order to be matched.
+*
+* @param clazz Class of the subtype
+* @tparam S Type of the subtype
+* @return The same pattern operator with the new subtype constraint
+*/
+  def subtype[S <: F : ClassTag](clazz: Class[S]): Pattern[T, S] = {
+jPattern.subtype(clazz)
+this.asInstanceOf[Pattern[T, S]]
+  }
+
+  /**
+* Defines the maximum time interval for a matching pattern. This means 
that the time gap
+* between first and the last event must not be longer than the window 
time.
+*
+* @param windowTime Time of the matching window
+* @return The same pattern operator with the new window length
+*/
+  def within(windowTime: Time): Pattern[T, F] = {
+jPattern.within(windowTime)
+this
+  }
+
+  /**
+* Appends a new pattern operator to the existing one. The new pattern 
operator enforces strict
+* temporal contiguity. This means that the whole pattern only matches 
if an event which matches
+* this operator directly follows the preceding matching event. Thus, 
there cannot be any
+* events in between two matching events.
+*
+* @param name Name of the new pattern operator
+* @return A new pattern operator which is appended to this pattern 
operator
+*/
+  def next(name: String): Pattern[T, T] = {
+wrapPattern(jPattern.next(name))
--- End diff --

Could be done more efficiently with `Pattern[T, T](jPattern.next(name))`.


> Scala API for CEP
> -
>
> Key: FLINK-3708
>

[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249834#comment-15249834
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60401939
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.cep
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+import scala.reflect.ClassTag
+
+/**
+  * Base class for a pattern definition.
+  * 
+  * A pattern definition is used by { @link 
org.apache.flink.cep.nfa.compiler.NFACompiler} to create
+  * a { @link NFA}.
+  *
+  * { @code
+  * Pattern pattern = Pattern.begin("start")
+  * .next("middle").subtype(F.class)
+  * .followedBy("end").where(new MyFilterFunction());
+  * }
+  * 
+  *
+  * @param jPattern Underlying Java API Pattern
+  * @tparam T Base type of the elements appearing in the pattern
+  * @tparam F Subtype of T to which the current pattern operator is 
constrained
+  */
+class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) {
+
+  private[flink] def getWrappedPattern = jPattern
+
+
+  /**
+*
+* @return Name of the pattern operator
+*/
+  def getName: String = jPattern.getName
+
+  /**
+*
+* @return Window length in which the pattern match has to occur
+*/
+  def getWindowTime: Option[Time] = {
+val time = jPattern.getWindowTime
+if (time == null) None else Some(time)
+  }
+
+  /**
+*
+* @return Filter condition for an event to be matched
+*/
+  def getFilterFunction: Option[FilterFunction[F]] = {
+val filterFun = jPattern.getFilterFunction
+if (filterFun == null) None else Some(filterFun)
+  }
+
+  /**
+* Applies a subtype constraint on the current pattern operator. This 
means that an event has
+* to be of the given subtype in order to be matched.
+*
+* @param clazz Class of the subtype
+* @tparam S Type of the subtype
+* @return The same pattern operator with the new subtype constraint
+*/
+  def subtype[S <: F : ClassTag](clazz: Class[S]): Pattern[T, S] = {
+jPattern.subtype(clazz)
+this.asInstanceOf[Pattern[T, S]]
+  }
+
+  /**
+* Defines the maximum time interval for a matching pattern. This means 
that the time gap
+* between first and the last event must not be longer than the window 
time.
+*
+* @param windowTime Time of the matching window
+* @return The same pattern operator with the new window length
+*/
+  def within(windowTime: Time): Pattern[T, F] = {
+jPattern.within(windowTime)
+this
+  }
+
+  /**
+* Appends a new pattern operator to the existing one. The new pattern 
operator enforces strict
+* temporal contiguity. This means that the whole pattern only matches 
if an event which matches
+* this operator directly follows the preceding matching event. Thus, 
there cannot be any
+* events in between two matching events.
+*
+* @param name Name of the new pattern operator
+* @return A new pattern operator which is appended to this pattern 
operator
+*/
+  def next(name: String): Pattern[T, T] = {
+wrapPattern(jPattern.next(name))
+  }
+
+  /**
+* Appends a new pattern operator to the existing one. The new pattern 
operator enforces
+* non-strict temporal contiguity. This means that a matching 

[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249831#comment-15249831
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60401794
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.cep
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+import scala.reflect.ClassTag
+
+/**
+  * Base class for a pattern definition.
+  * 
+  * A pattern definition is used by { @link 
org.apache.flink.cep.nfa.compiler.NFACompiler} to create
+  * a { @link NFA}.
+  *
+  * { @code
+  * Pattern pattern = Pattern.begin("start")
+  * .next("middle").subtype(F.class)
+  * .followedBy("end").where(new MyFilterFunction());
+  * }
+  * 
+  *
+  * @param jPattern Underlying Java API Pattern
+  * @tparam T Base type of the elements appearing in the pattern
+  * @tparam F Subtype of T to which the current pattern operator is 
constrained
+  */
+class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) {
+
+  private[flink] def getWrappedPattern = jPattern
+
+
+  /**
+*
+* @return Name of the pattern operator
+*/
+  def getName: String = jPattern.getName
+
+  /**
+*
+* @return Window length in which the pattern match has to occur
+*/
+  def getWindowTime: Option[Time] = {
+val time = jPattern.getWindowTime
+if (time == null) None else Some(time)
+  }
+
+  /**
+*
+* @return Filter condition for an event to be matched
+*/
+  def getFilterFunction: Option[FilterFunction[F]] = {
+val filterFun = jPattern.getFilterFunction
+if (filterFun == null) None else Some(filterFun)
+  }
+
+  /**
+* Applies a subtype constraint on the current pattern operator. This 
means that an event has
+* to be of the given subtype in order to be matched.
+*
+* @param clazz Class of the subtype
+* @tparam S Type of the subtype
+* @return The same pattern operator with the new subtype constraint
+*/
+  def subtype[S <: F : ClassTag](clazz: Class[S]): Pattern[T, S] = {
+jPattern.subtype(clazz)
+this.asInstanceOf[Pattern[T, S]]
+  }
+
+  /**
+* Defines the maximum time interval for a matching pattern. This means 
that the time gap
+* between first and the last event must not be longer than the window 
time.
+*
+* @param windowTime Time of the matching window
+* @return The same pattern operator with the new window length
+*/
+  def within(windowTime: Time): Pattern[T, F] = {
+jPattern.within(windowTime)
+this
+  }
+
+  /**
+* Appends a new pattern operator to the existing one. The new pattern 
operator enforces strict
+* temporal contiguity. This means that the whole pattern only matches 
if an event which matches
+* this operator directly follows the preceding matching event. Thus, 
there cannot be any
+* events in between two matching events.
+*
+* @param name Name of the new pattern operator
+* @return A new pattern operator which is appended to this pattern 
operator
+*/
+  def next(name: String): Pattern[T, T] = {
+wrapPattern(jPattern.next(name))
+  }
+
+  /**
+* Appends a new pattern operator to the existing one. The new pattern 
operator enforces
+* non-strict temporal contiguity. This means that a matching 

[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249828#comment-15249828
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60401470
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.cep
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+import scala.reflect.ClassTag
+
+/**
+  * Base class for a pattern definition.
+  * 
+  * A pattern definition is used by { @link 
org.apache.flink.cep.nfa.compiler.NFACompiler} to create
+  * a { @link NFA}.
+  *
+  * { @code
+  * Pattern pattern = Pattern.begin("start")
+  * .next("middle").subtype(F.class)
+  * .followedBy("end").where(new MyFilterFunction());
+  * }
+  * 
--- End diff --

Code examples in ScalaDocs use `{{{ code }}}`


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249819#comment-15249819
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60400970
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/package.scala
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.flink.cep.{PatternStream => JPatternStream}
+
+import _root_.scala.reflect.ClassTag
+
+package object scala {
+
+  /**
+* Utility method to wrap { @link org.apache.flink.cep.PatternStream}
+* for usage with the Scala API.
+*
+* @param javaPatternStream The underlying pattern stream from the Java 
API
+* @tparam T Type of the events
+* @return A pattern stream from the Scala API which wraps a pattern 
stream from the Java API
+*/
+  private[flink] def wrapPatternStream[T: TypeInformation : 
ClassTag](javaPatternStream: JPatternStream[T])
+  : scala.PatternStream[T] = {
+javaPatternStream match {
+  case p: JPatternStream[T] => PatternStream[T](p)
+  case _ => null
--- End diff --

I think we should throw an `Exception` if we encounter a `null` value here. 
The reason is that the subsequent operations would throw a 
`NullPointerException` otherwise. Moreover, there is no good strategy how to 
continue with a `null` pattern stream.

The most scalaesque way would be:

```
Option(javaPatternStream) match {
  case Some(p) => PatternStream[T](p)
  case None => throw new Exception(...)
}
```


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249814#comment-15249814
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60400262
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import java.util.{Map => JMap}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.cep.{PatternFlatSelectFunction, 
PatternSelectFunction, PatternStream => JPatternStream}
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.util.Collector
+import scala.reflect.ClassTag
+
+/**
+  * Stream abstraction for CEP pattern detection. A pattern stream is a 
stream which emits detected
+  * pattern sequences as a map of events associated with their names. The 
pattern is detected using a
+  * { @link org.apache.flink.cep.nfa.NFA}. In order to process the 
detected sequences, the user
+  * has to specify a { @link PatternSelectFunction} or a { @link 
PatternFlatSelectFunction}.
+  *
+  * @param jPatternStream Underlying pattern stream from Java API
+  * @tparam T Type of the events
+  */
+class PatternStream[T: TypeInformation : ClassTag](jPatternStream: 
JPatternStream[T]) {
+
+  private[flink] def getWrappedPatternStream = jPatternStream
+
+  /**
+* Applies a select function to the detected pattern sequence. For each 
pattern sequence the
+* provided { @link PatternSelectFunction} is called. The pattern 
select function can produce
+* exactly one resulting element.
+*
+* @param patternSelectFunction The pattern select function which is 
called for each detected
+*  pattern sequence.
+* @tparam R Type of the resulting elements
+* @return { @link DataStream} which contains the resulting elements 
from the pattern select
+* unction.
+*/
+  def select[R: TypeInformation : ClassTag](patternSelectFunction: 
PatternSelectFunction[T, R]): DataStream[R] = {
+asScalaStream(jPatternStream.select(patternSelectFunction, 
implicitly[TypeInformation[R]]))
+  }
+
+  /**
+* Applies a flat select function to the detected pattern sequence. For 
each pattern sequence
+* the provided { @link PatternFlatSelectFunction} is called. The 
pattern flat select function
+* can produce an arbitrary number of resulting elements.
+*
+* @param patternFlatSelectFunction The pattern flat select function 
which is called for each
+*  detected pattern sequence.
+* @tparam R Type of the resulting elements
+* @return { @link DataStream} which contains the resulting elements 
from the pattern flat select
+* function.
+*/
+  def flatSelect[R: TypeInformation : ClassTag](patternFlatSelectFunction: 
PatternFlatSelectFunction[T, R]): DataStream[R] = {
+asScalaStream(jPatternStream.flatSelect(patternFlatSelectFunction, 
implicitly[TypeInformation[R]]))
+  }
+
+  /**
+* Applies a select function to the detected pattern sequence. For each 
pattern sequence the
+* provided { @link PatternSelectFunction} is called. The pattern 
select function can produce
+* exactly one resulting element.
+*
+* @param patternSelectFun The pattern select function which is called 
for each detected
+* pattern sequence.
+* @tparam R Type of the resulting elements
+* @return { @link DataStream} which contains the resulting elements 
from the pattern select
+* function.
+*/
+  def select[R: TypeInformation : 

[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249797#comment-15249797
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60399432
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import java.util.{Map => JMap}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.cep.{PatternFlatSelectFunction, 
PatternSelectFunction, PatternStream => JPatternStream}
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.util.Collector
+import scala.reflect.ClassTag
+
+/**
+  * Stream abstraction for CEP pattern detection. A pattern stream is a 
stream which emits detected
+  * pattern sequences as a map of events associated with their names. The 
pattern is detected using a
+  * { @link org.apache.flink.cep.nfa.NFA}. In order to process the 
detected sequences, the user
+  * has to specify a { @link PatternSelectFunction} or a { @link 
PatternFlatSelectFunction}.
+  *
+  * @param jPatternStream Underlying pattern stream from Java API
+  * @tparam T Type of the events
+  */
+class PatternStream[T: TypeInformation : ClassTag](jPatternStream: 
JPatternStream[T]) {
+
+  private[flink] def getWrappedPatternStream = jPatternStream
+
+  /**
+* Applies a select function to the detected pattern sequence. For each 
pattern sequence the
+* provided { @link PatternSelectFunction} is called. The pattern 
select function can produce
+* exactly one resulting element.
+*
+* @param patternSelectFunction The pattern select function which is 
called for each detected
+*  pattern sequence.
+* @tparam R Type of the resulting elements
+* @return { @link DataStream} which contains the resulting elements 
from the pattern select
+* unction.
+*/
+  def select[R: TypeInformation : ClassTag](patternSelectFunction: 
PatternSelectFunction[T, R]): DataStream[R] = {
+asScalaStream(jPatternStream.select(patternSelectFunction, 
implicitly[TypeInformation[R]]))
+  }
+
+  /**
+* Applies a flat select function to the detected pattern sequence. For 
each pattern sequence
+* the provided { @link PatternFlatSelectFunction} is called. The 
pattern flat select function
+* can produce an arbitrary number of resulting elements.
+*
+* @param patternFlatSelectFunction The pattern flat select function 
which is called for each
+*  detected pattern sequence.
+* @tparam R Type of the resulting elements
+* @return { @link DataStream} which contains the resulting elements 
from the pattern flat select
+* function.
+*/
+  def flatSelect[R: TypeInformation : ClassTag](patternFlatSelectFunction: 
PatternFlatSelectFunction[T, R]): DataStream[R] = {
+asScalaStream(jPatternStream.flatSelect(patternFlatSelectFunction, 
implicitly[TypeInformation[R]]))
+  }
+
+  /**
+* Applies a select function to the detected pattern sequence. For each 
pattern sequence the
+* provided { @link PatternSelectFunction} is called. The pattern 
select function can produce
+* exactly one resulting element.
+*
+* @param patternSelectFun The pattern select function which is called 
for each detected
+* pattern sequence.
+* @tparam R Type of the resulting elements
+* @return { @link DataStream} which contains the resulting elements 
from the pattern select
+* function.
+*/
+  def select[R: TypeInformation : 

[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249796#comment-15249796
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60399384
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import java.util.{Map => JMap}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.cep.{PatternFlatSelectFunction, 
PatternSelectFunction, PatternStream => JPatternStream}
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.util.Collector
+import scala.reflect.ClassTag
+
+/**
+  * Stream abstraction for CEP pattern detection. A pattern stream is a 
stream which emits detected
+  * pattern sequences as a map of events associated with their names. The 
pattern is detected using a
+  * { @link org.apache.flink.cep.nfa.NFA}. In order to process the 
detected sequences, the user
+  * has to specify a { @link PatternSelectFunction} or a { @link 
PatternFlatSelectFunction}.
+  *
+  * @param jPatternStream Underlying pattern stream from Java API
+  * @tparam T Type of the events
+  */
+class PatternStream[T: TypeInformation : ClassTag](jPatternStream: 
JPatternStream[T]) {
+
+  private[flink] def getWrappedPatternStream = jPatternStream
+
+  /**
+* Applies a select function to the detected pattern sequence. For each 
pattern sequence the
+* provided { @link PatternSelectFunction} is called. The pattern 
select function can produce
+* exactly one resulting element.
+*
+* @param patternSelectFunction The pattern select function which is 
called for each detected
+*  pattern sequence.
+* @tparam R Type of the resulting elements
+* @return { @link DataStream} which contains the resulting elements 
from the pattern select
+* unction.
+*/
+  def select[R: TypeInformation : ClassTag](patternSelectFunction: 
PatternSelectFunction[T, R]): DataStream[R] = {
--- End diff --

`ClassTag` not needed


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249792#comment-15249792
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60399207
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import java.util.{Map => JMap}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.cep.{PatternFlatSelectFunction, 
PatternSelectFunction, PatternStream => JPatternStream}
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.util.Collector
+import scala.reflect.ClassTag
+
+/**
+  * Stream abstraction for CEP pattern detection. A pattern stream is a 
stream which emits detected
+  * pattern sequences as a map of events associated with their names. The 
pattern is detected using a
+  * { @link org.apache.flink.cep.nfa.NFA}. In order to process the 
detected sequences, the user
+  * has to specify a { @link PatternSelectFunction} or a { @link 
PatternFlatSelectFunction}.
+  *
+  * @param jPatternStream Underlying pattern stream from Java API
+  * @tparam T Type of the events
+  */
+class PatternStream[T: TypeInformation : ClassTag](jPatternStream: 
JPatternStream[T]) {
--- End diff --

`ClassTag` not needed


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249791#comment-15249791
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60399173
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.cep.scala.pattern.Pattern
+import org.apache.flink.cep.{CEP => JCEP}
+import org.apache.flink.streaming.api.scala.DataStream
+
+import scala.reflect.ClassTag
+
+/**
+  * Utility method to transform a { @link DataStream} into a { @link 
PatternStream} to do CEP.
+  */
+
+object CEP {
+  /**
+* Transforms a { @link DataStream[T]} into a { @link PatternStream[T]} 
in the Scala API.
+* See { @link org.apache.flink.cep.CEP} for a more detailed 
description how the underlying
+* Java API works.
--- End diff --

links in comments


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249788#comment-15249788
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60399038
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.cep.scala.pattern.Pattern
+import org.apache.flink.cep.{CEP => JCEP}
+import org.apache.flink.streaming.api.scala.DataStream
+
+import scala.reflect.ClassTag
+
+/**
+  * Utility method to transform a { @link DataStream} into a { @link 
PatternStream} to do CEP.
--- End diff --

In ScalaDocs you comment using the syntax `[[DataStream]]`


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249785#comment-15249785
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60398887
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.cep.scala.pattern.Pattern
+import org.apache.flink.cep.{CEP => JCEP}
+import org.apache.flink.streaming.api.scala.DataStream
+
+import scala.reflect.ClassTag
+
+/**
+  * Utility method to transform a { @link DataStream} into a { @link 
PatternStream} to do CEP.
+  */
+
+object CEP {
+  /**
+* Transforms a { @link DataStream[T]} into a { @link PatternStream[T]} 
in the Scala API.
+* See { @link org.apache.flink.cep.CEP} for a more detailed 
description how the underlying
+* Java API works.
+*
+* @param input   DataStream containing the input events
+* @param pattern Pattern specification which shall be detected
+* @tparam T Type of the input events
+* @return Resulting pattern stream
+*/
+  def pattern[T: TypeInformation : ClassTag](input: DataStream[T], 
pattern: Pattern[T, _]): PatternStream[T] = {
--- End diff --

`ClassTag` not needed


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249783#comment-15249783
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60398671
  
--- Diff: flink-libraries/flink-cep-scala/pom.xml ---
@@ -0,0 +1,209 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+org.apache.flink
+flink-libraries
+1.1-SNAPSHOT
+..
+
+flink-cep-scala_2.10
+flink-cep-scala
+jar
+
+
+
+org.apache.flink
+flink-scala_2.10
+${project.version}
+   provided
+
+
+org.apache.flink
+flink-clients_2.10
+${project.version}
+   provided
+
+
+org.apache.flink
+flink-cep_2.10
+${project.version}
+
+
+org.apache.flink
+flink-streaming-java_2.10
+${project.version}
+provided
+
+
+org.apache.flink
+flink-streaming-scala_2.10
+${project.version}
+provided
+
+
+org.apache.flink
+flink-tests_2.10
+${project.version}
+test
+test-jar
+
+
+org.apache.flink
+flink-test-utils_2.10
+${project.version}
+test
+
+
+org.apache.flink
+flink-streaming-java_2.10
+${project.version}
+test
+test-jar
+
+
+org.apache.flink
+flink-streaming-scala_2.10
+${project.version}
+test
+test-jar
+
+
+
+
+
+
+
+net.alchim31.maven
+scala-maven-plugin
+3.1.4
+
+
+
+scala-compile-first
+process-resources
+
+compile
+
+
+
+
+
+scala-test-compile
+process-test-resources
+
+testCompile
+
+
+
+
+
+-Xms128m
+-Xmx512m
+
+
+
+org.scalamacros
+
paradise_${scala.version}
+${scala.macros.version}
+
+
+
+
+
+
+
+org.apache.maven.plugins
+maven-eclipse-plugin
+2.8
+
+true
+
+
org.scala-ide.sdt.core.scalanature
+
org.eclipse.jdt.core.javanature
+
+
+
org.scala-ide.sdt.core.scalabuilder
+
+
+
org.scala-ide.sdt.launching.SCALA_CONTAINER
+
org.eclipse.jdt.launching.JRE_CONTAINER
+
+
+org.scala-lang:scala-library
+org.scala-lang:scala-compiler
+
+
+**/*.scala
+**/*.java
+
+
+
+
+
+
+org.codehaus.mojo
+build-helper-maven-plugin
+1.7
+
+
+
+add-source
+

[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249782#comment-15249782
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60398573
  
--- Diff: flink-libraries/flink-cep-scala/pom.xml ---
@@ -0,0 +1,209 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+org.apache.flink
+flink-libraries
+1.1-SNAPSHOT
+..
+
+flink-cep-scala_2.10
+flink-cep-scala
+jar
+
+
+
+org.apache.flink
+flink-scala_2.10
+${project.version}
+   provided
+
+
+org.apache.flink
+flink-clients_2.10
+${project.version}
+   provided
+
+
+org.apache.flink
+flink-cep_2.10
+${project.version}
+
+
+org.apache.flink
+flink-streaming-java_2.10
+${project.version}
+provided
+
+
+org.apache.flink
+flink-streaming-scala_2.10
+${project.version}
+provided
+
+
+org.apache.flink
+flink-tests_2.10
+${project.version}
+test
+test-jar
+
+
+org.apache.flink
+flink-test-utils_2.10
+${project.version}
+test
+
+
+org.apache.flink
+flink-streaming-java_2.10
+${project.version}
+test
+test-jar
+
+
+org.apache.flink
+flink-streaming-scala_2.10
+${project.version}
+test
+test-jar
+
+
+
+
+
+
+
+net.alchim31.maven
+scala-maven-plugin
+3.1.4
+
+
+
+scala-compile-first
+process-resources
+
+compile
+
+
+
+
+
+scala-test-compile
+process-test-resources
+
+testCompile
+
+
+
+
+
+-Xms128m
+-Xmx512m
+
+
+
+org.scalamacros
+
paradise_${scala.version}
+${scala.macros.version}
+
+
+
+
+
+
+
+org.apache.maven.plugins
+maven-eclipse-plugin
+2.8
+
+true
+
+
org.scala-ide.sdt.core.scalanature
+
org.eclipse.jdt.core.javanature
+
+
+
org.scala-ide.sdt.core.scalabuilder
+
+
+
org.scala-ide.sdt.launching.SCALA_CONTAINER
+
org.eclipse.jdt.launching.JRE_CONTAINER
+
+
+org.scala-lang:scala-library
+org.scala-lang:scala-compiler
+
+
+**/*.scala
+**/*.java
+
+
+
--- End diff --

I'm not sure how relevant this is anymore.


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  

[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249780#comment-15249780
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60398456
  
--- Diff: flink-libraries/flink-cep-scala/pom.xml ---
@@ -0,0 +1,209 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+org.apache.flink
+flink-libraries
+1.1-SNAPSHOT
+..
+
+flink-cep-scala_2.10
+flink-cep-scala
+jar
+
+
+
+org.apache.flink
+flink-scala_2.10
+${project.version}
+   provided
+
+
+org.apache.flink
+flink-clients_2.10
+${project.version}
+   provided
+
+
+org.apache.flink
+flink-cep_2.10
+${project.version}
+
+
+org.apache.flink
+flink-streaming-java_2.10
+${project.version}
+provided
+
+
+org.apache.flink
+flink-streaming-scala_2.10
+${project.version}
+provided
+
+
+org.apache.flink
+flink-tests_2.10
+${project.version}
+test
+test-jar
+
+
+org.apache.flink
+flink-test-utils_2.10
+${project.version}
+test
+
+
+org.apache.flink
+flink-streaming-java_2.10
+${project.version}
+test
+test-jar
+
+
+org.apache.flink
+flink-streaming-scala_2.10
+${project.version}
+test
+test-jar
+
+
+
+
+
+
+
+net.alchim31.maven
+scala-maven-plugin
+3.1.4
+
+
+
+scala-compile-first
+process-resources
+
+compile
+
+
+
+
+
+scala-test-compile
+process-test-resources
+
+testCompile
+
+
+
+
+
+-Xms128m
+-Xmx512m
+
+
+
+org.scalamacros
+
paradise_${scala.version}
+${scala.macros.version}
+
+
+
+
--- End diff --

I think you can replace this with

```

org.scala-tools
maven-scala-plugin
2.15.2



compile
testCompile




src/main/scala

src/test/scala

-Xms64m
-Xmx1024m



```

Since we don't have a mixed Java Scala project here.


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>

[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249759#comment-15249759
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60397159
  
--- Diff: flink-libraries/flink-cep-scala/pom.xml ---
@@ -0,0 +1,209 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+org.apache.flink
+flink-libraries
+1.1-SNAPSHOT
+..
+
+flink-cep-scala_2.10
+flink-cep-scala
+jar
+
+
+
+org.apache.flink
+flink-scala_2.10
+${project.version}
+   provided
+
+
+org.apache.flink
+flink-clients_2.10
+${project.version}
+   provided
+
+
+org.apache.flink
+flink-cep_2.10
+${project.version}
+
+
+org.apache.flink
+flink-streaming-java_2.10
+${project.version}
+provided
+
+
+org.apache.flink
+flink-streaming-scala_2.10
+${project.version}
+provided
+
+
+org.apache.flink
+flink-tests_2.10
+${project.version}
+test
+test-jar
+
+
+org.apache.flink
+flink-test-utils_2.10
+${project.version}
+test
+
+
+org.apache.flink
+flink-streaming-java_2.10
+${project.version}
+test
+test-jar
+
--- End diff --

Not needed


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249755#comment-15249755
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60397035
  
--- Diff: flink-libraries/flink-cep-scala/pom.xml ---
@@ -0,0 +1,209 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+org.apache.flink
+flink-libraries
+1.1-SNAPSHOT
+..
+
+flink-cep-scala_2.10
+flink-cep-scala
+jar
+
+
+
+org.apache.flink
+flink-scala_2.10
+${project.version}
+   provided
+
--- End diff --

Not needed


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249756#comment-15249756
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60397042
  
--- Diff: flink-libraries/flink-cep-scala/pom.xml ---
@@ -0,0 +1,209 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+org.apache.flink
+flink-libraries
+1.1-SNAPSHOT
+..
+
+flink-cep-scala_2.10
+flink-cep-scala
+jar
+
+
+
+org.apache.flink
+flink-scala_2.10
+${project.version}
+   provided
+
+
+org.apache.flink
+flink-clients_2.10
+${project.version}
+   provided
+
--- End diff --

Not needed


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249719#comment-15249719
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60394562
  
--- Diff: 
flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/StreamEvent.scala
 ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+object StreamEvent {
+  def of[V](event: V, timestamp: Long): StreamEvent[V] = {
+new StreamEvent[V](event, timestamp)
+  }
+}
+
+class StreamEvent[T](val event: T, val timestamp: Long) {
--- End diff --

`StreamEvent` could be defined as a case class. Then you don't have to 
define the factory methods.


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249717#comment-15249717
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60394471
  
--- Diff: 
flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/StreamEvent.scala
 ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+object StreamEvent {
+  def of[V](event: V, timestamp: Long): StreamEvent[V] = {
+new StreamEvent[V](event, timestamp)
+  }
+}
+
+class StreamEvent[T](val event: T, val timestamp: Long) {
+
--- End diff --

line breaks unnecessary


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249712#comment-15249712
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60394116
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java 
---
@@ -61,19 +61,36 @@
public  DataStream select(final PatternSelectFunction 
patternSelectFunction) {
// we have to extract the output type from the provided pattern 
selection function manually
// because the TypeExtractor cannot do that if the method is 
wrapped in a MapFunction
-   TypeInformation outTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
-   patternSelectFunction,
-   PatternSelectFunction.class,
-   1,
-   -1,
-   inputType,
-   null,
-   false);
 
+   TypeInformation returnType = 
TypeExtractor.getUnaryOperatorReturnType(
+   patternSelectFunction,
+   PatternSelectFunction.class,
+   1,
+   -1,
+   inputType,
+   null,
+   false);
--- End diff --

Indentation


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249708#comment-15249708
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60393928
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java 
---
@@ -99,10 +116,26 @@
null,
false);
 
+   return flatSelect(patternFlatSelectFunction, outTypeInfo);
+   }
+
+   /**
+* Applies a flat select function to the detected pattern sequence. For 
each pattern sequence
+* the provided {@link PatternFlatSelectFunction} is called. The 
pattern flat select function
+* can produce an arbitrary number of resulting elements.
+*
+* @param patternFlatSelectFunction The pattern flat select function 
which is called for each
+*  detected pattern sequence.
+* @param  Typ of the resulting elements
+* @param outTypeInfo Explicit specification of output type.
+* @return {@link DataStream} which contains the resulting elements 
from the pattern flat select
+* function.
+*/
+   public  DataStream flatSelect(final PatternFlatSelectFunction patternFlatSelectFunction, TypeInformation outTypeInfo) {
return patternStream.flatMap(
-   new PatternFlatSelectMapper(
-   
patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction)
-   )).returns(outTypeInfo);
+   new PatternFlatSelectMapper(
+   
patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction)
+   )).returns(outTypeInfo);
}
--- End diff --

Indentation off in this file.


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249609#comment-15249609
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60383831
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.cep
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+import scala.reflect.ClassTag
+
+/**
+  * Base class for a pattern definition.
+  * 
+  * A pattern definition is used by { @link 
org.apache.flink.cep.nfa.compiler.NFACompiler} to create
+  * a { @link NFA}.
+  *
+  * { @code
+  * Pattern pattern = Pattern.begin("start")
+  * .next("middle").subtype(F.class)
+  * .followedBy("end").where(new MyFilterFunction());
+  * }
+  * 
+  *
+  * @param jPattern Underlying Java API Pattern
+  * @tparam T Base type of the elements appearing in the pattern
+  * @tparam F Subtype of T to which the current pattern operator is 
constrained
+  */
+class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) {
+
+  private[flink] def getWrappedPattern = jPattern
+
+
+  /**
+*
+* @return Name of the pattern operator
+*/
+  def getName: String = jPattern.getName
+
+  /**
+*
+* @return Window length in which the pattern match has to occur
+*/
+  def getWindowTime: Option[Time] = {
+val time = jPattern.getWindowTime
+if (time == null) None else Some(time)
+  }
+
+  /**
+*
+* @return Filter condition for an event to be matched
+*/
+  def getFilterFunction: Option[FilterFunction[F]] = {
+val filterFun = jPattern.getFilterFunction
+if (filterFun == null) None else Some(filterFun)
+  }
+
+  /**
+* Applies a subtype constraint on the current pattern operator. This 
means that an event has
+* to be of the given subtype in order to be matched.
+*
+* @param clazz Class of the subtype
+* @tparam S Type of the subtype
+* @return The same pattern operator with the new subtype constraint
+*/
+  def subtype[S <: F : ClassTag](clazz: Class[S]): Pattern[T, S] = {
+jPattern.subtype(clazz)
+this.asInstanceOf[Pattern[T, S]]
+  }
+
+  /**
+* Defines the maximum time interval for a matching pattern. This means 
that the time gap
+* between first and the last event must not be longer than the window 
time.
+*
+* @param windowTime Time of the matching window
+* @return The same pattern operator with the new window length
+*/
+  def within(windowTime: Time): Pattern[T, F] = {
+jPattern.within(windowTime)
+this
+  }
+
+  /**
+* Appends a new pattern operator to the existing one. The new pattern 
operator enforces strict
+* temporal contiguity. This means that the whole pattern only matches 
if an event which matches
+* this operator directly follows the preceding matching event. Thus, 
there cannot be any
+* events in between two matching events.
+*
+* @param name Name of the new pattern operator
+* @return A new pattern operator which is appended to this pattern 
operator
+*/
+  def next(name: String): Pattern[T, T] = {
+wrapPattern(jPattern.next(name))
+  }
+
+  /**
+* Appends a new pattern operator to the existing one. The new pattern 
operator enforces
+* non-strict temporal contiguity. This means that a matching 

[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249608#comment-15249608
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60383704
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.cep
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+import scala.reflect.ClassTag
+
+/**
+  * Base class for a pattern definition.
+  * 
+  * A pattern definition is used by { @link 
org.apache.flink.cep.nfa.compiler.NFACompiler} to create
+  * a { @link NFA}.
+  *
+  * { @code
+  * Pattern pattern = Pattern.begin("start")
+  * .next("middle").subtype(F.class)
+  * .followedBy("end").where(new MyFilterFunction());
+  * }
+  * 
+  *
+  * @param jPattern Underlying Java API Pattern
+  * @tparam T Base type of the elements appearing in the pattern
+  * @tparam F Subtype of T to which the current pattern operator is 
constrained
+  */
+class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) {
+
+  private[flink] def getWrappedPattern = jPattern
+
+
+  /**
+*
+* @return Name of the pattern operator
+*/
+  def getName: String = jPattern.getName
+
+  /**
+*
+* @return Window length in which the pattern match has to occur
+*/
+  def getWindowTime: Option[Time] = {
+val time = jPattern.getWindowTime
+if (time == null) None else Some(time)
+  }
+
+  /**
+*
+* @return Filter condition for an event to be matched
+*/
+  def getFilterFunction: Option[FilterFunction[F]] = {
+val filterFun = jPattern.getFilterFunction
+if (filterFun == null) None else Some(filterFun)
+  }
+
+  /**
+* Applies a subtype constraint on the current pattern operator. This 
means that an event has
+* to be of the given subtype in order to be matched.
+*
+* @param clazz Class of the subtype
+* @tparam S Type of the subtype
+* @return The same pattern operator with the new subtype constraint
+*/
+  def subtype[S <: F : ClassTag](clazz: Class[S]): Pattern[T, S] = {
+jPattern.subtype(clazz)
+this.asInstanceOf[Pattern[T, S]]
+  }
+
+  /**
+* Defines the maximum time interval for a matching pattern. This means 
that the time gap
+* between first and the last event must not be longer than the window 
time.
+*
+* @param windowTime Time of the matching window
+* @return The same pattern operator with the new window length
+*/
+  def within(windowTime: Time): Pattern[T, F] = {
+jPattern.within(windowTime)
+this
+  }
+
+  /**
+* Appends a new pattern operator to the existing one. The new pattern 
operator enforces strict
+* temporal contiguity. This means that the whole pattern only matches 
if an event which matches
+* this operator directly follows the preceding matching event. Thus, 
there cannot be any
+* events in between two matching events.
+*
+* @param name Name of the new pattern operator
+* @return A new pattern operator which is appended to this pattern 
operator
+*/
+  def next(name: String): Pattern[T, T] = {
+wrapPattern(jPattern.next(name))
+  }
+
+  /**
+* Appends a new pattern operator to the existing one. The new pattern 
operator enforces
+* non-strict temporal contiguity. This means that a matching 

[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249605#comment-15249605
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60383389
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.cep
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+import scala.reflect.ClassTag
+
+/**
+  * Base class for a pattern definition.
+  * 
+  * A pattern definition is used by { @link 
org.apache.flink.cep.nfa.compiler.NFACompiler} to create
+  * a { @link NFA}.
+  *
+  * { @code
+  * Pattern pattern = Pattern.begin("start")
+  * .next("middle").subtype(F.class)
+  * .followedBy("end").where(new MyFilterFunction());
+  * }
+  * 
+  *
+  * @param jPattern Underlying Java API Pattern
+  * @tparam T Base type of the elements appearing in the pattern
+  * @tparam F Subtype of T to which the current pattern operator is 
constrained
+  */
+class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) {
+
+  private[flink] def getWrappedPattern = jPattern
+
+
+  /**
+*
+* @return Name of the pattern operator
+*/
+  def getName: String = jPattern.getName
+
+  /**
+*
+* @return Window length in which the pattern match has to occur
+*/
+  def getWindowTime: Option[Time] = {
+val time = jPattern.getWindowTime
+if (time == null) None else Some(time)
+  }
+
+  /**
+*
+* @return Filter condition for an event to be matched
+*/
+  def getFilterFunction: Option[FilterFunction[F]] = {
+val filterFun = jPattern.getFilterFunction
+if (filterFun == null) None else Some(filterFun)
--- End diff --

Same here


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249599#comment-15249599
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60383097
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.cep
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+import scala.reflect.ClassTag
+
+/**
+  * Base class for a pattern definition.
+  * 
+  * A pattern definition is used by { @link 
org.apache.flink.cep.nfa.compiler.NFACompiler} to create
+  * a { @link NFA}.
+  *
+  * { @code
+  * Pattern pattern = Pattern.begin("start")
+  * .next("middle").subtype(F.class)
+  * .followedBy("end").where(new MyFilterFunction());
+  * }
+  * 
+  *
+  * @param jPattern Underlying Java API Pattern
+  * @tparam T Base type of the elements appearing in the pattern
+  * @tparam F Subtype of T to which the current pattern operator is 
constrained
+  */
+class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) {
+
+  private[flink] def getWrappedPattern = jPattern
+
+
+  /**
+*
+* @return Name of the pattern operator
+*/
+  def getName: String = jPattern.getName
+
+  /**
+*
+* @return Window length in which the pattern match has to occur
+*/
+  def getWindowTime: Option[Time] = {
+val time = jPattern.getWindowTime
+if (time == null) None else Some(time)
--- End diff --

It's easier to simply write `Option(jPattern.getWindowTime())`


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249598#comment-15249598
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60382954
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.cep
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+import scala.reflect.ClassTag
+
+/**
+  * Base class for a pattern definition.
+  * 
+  * A pattern definition is used by { @link 
org.apache.flink.cep.nfa.compiler.NFACompiler} to create
+  * a { @link NFA}.
+  *
+  * { @code
+  * Pattern pattern = Pattern.begin("start")
+  * .next("middle").subtype(F.class)
+  * .followedBy("end").where(new MyFilterFunction());
+  * }
+  * 
+  *
+  * @param jPattern Underlying Java API Pattern
+  * @tparam T Base type of the elements appearing in the pattern
+  * @tparam F Subtype of T to which the current pattern operator is 
constrained
+  */
+class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) {
+
+  private[flink] def getWrappedPattern = jPattern
+
+
--- End diff --

two line break


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-20 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249561#comment-15249561
 ] 

Aljoscha Krettek commented on FLINK-3708:
-

I think it was required because we cannot use the dependency management for ASM 
because it is shaded. Some weird problem that went away when explicitly 
including it in gelly...

> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247487#comment-15247487
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1905#issuecomment-211843938
  
It looks like it was added as part of 
[FLINK-3136](https://issues.apache.org/jira/browse/FLINK-3136).


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247478#comment-15247478
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1905#issuecomment-211840791
  
Does anybody know why we needed to add the asm dependency for gelly scala? 
@tillrohrmann @vasia ?


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247413#comment-15247413
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user StefanRRichter commented on the pull request:

https://github.com/apache/flink/pull/1905#issuecomment-211819580
  
Thanks Ufuk! I just saw that the build actually failed and it seems to me 
that the problem is indeed related to removing the dependency to asm from the 
pom.xml as Robert suggested? So I could either add the dependency again or do 
you see any option to solve this otherwise?

There is one remaining question that I already started to discuss with Till 
about the select functions in PatternStream. The Java API wants to call them 
with java.util.Map as parameters, but I don't think we want Java classes in our 
Scala API. Currently I see two options how to change this without touching the 
Java API:

1) Use Scala mutable.Map and automatic conversion.
2) Use Scala immutable.Map and explicit conversion. From how I interpret 
the actual use of the Map in this function, the semantics are actually that of 
an immutable map.

If the impact of the conversion is not an issue, I have already implemented 
version 2 (it is currently commented out in favor of a version that still uses 
Java Map. 



> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247350#comment-15247350
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1905#issuecomment-211784417
  
Thank you for this great contributio! I think that CEP and a concise Scala 
API go very well together. :-) I'm not a Scala expert, but I'll also have a 
look later.


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15246128#comment-15246128
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60102619
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala
 ---
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.cep.pattern.{FollowedByPattern => 
JFollowedByPattern, Pattern => JPattern}
+
+import scala.reflect.ClassTag
+
+
+object FollowedByPattern {
+  def apply[T : ClassTag, F <: T : ClassTag]
+  (jfbPattern: JFollowedByPattern[T, F]) = new FollowedByPattern[T, 
F](jfbPattern)
+}
+
+/**
+  * Created by stefan on 16.04.16.
--- End diff --

Can you remove this?


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15246165#comment-15246165
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60104038
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala
 ---
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.cep.pattern.{FollowedByPattern => 
JFollowedByPattern, Pattern => JPattern}
+
+import scala.reflect.ClassTag
+
+
+object FollowedByPattern {
+  def apply[T : ClassTag, F <: T : ClassTag]
+  (jfbPattern: JFollowedByPattern[T, F]) = new FollowedByPattern[T, 
F](jfbPattern)
+}
+
+/**
+  * Created by stefan on 16.04.16.
--- End diff --

Sure. Thought I already removed all of it.


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15246163#comment-15246163
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60103959
  
--- Diff: flink-libraries/flink-cep-scala/pom.xml ---
@@ -0,0 +1,216 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+org.apache.flink
+flink-libraries
+1.1-SNAPSHOT
+..
+
+flink-cep-scala_2.10
+flink-cep-scala
+jar
+
+
+
+org.apache.flink
+flink-scala_2.10
+${project.version}
+   provided
+
+
--- End diff --

I initially took the pom.xml from gelly-scala as blueprint. I adopted the 
line from there; there was not any particular problem for me.


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15246119#comment-15246119
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60102211
  
--- Diff: flink-libraries/flink-cep-scala/pom.xml ---
@@ -0,0 +1,216 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+org.apache.flink
+flink-libraries
+1.1-SNAPSHOT
+..
+
+flink-cep-scala_2.10
+flink-cep-scala
+jar
+
+
+
+org.apache.flink
+flink-scala_2.10
+${project.version}
+   provided
+
+
--- End diff --

What exactly was the problem here? Ideally the module inherits the shading 
plugin from the parent and everything is shaded correctly.


> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3708) Scala API for CEP

2016-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15245917#comment-15245917
 ] 

ASF GitHub Bot commented on FLINK-3708:
---

GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/1905

[FLINK-3708] Scala API for CEP (initial).


This module flink-cep-scala adds a Scala pendant for the  Java CEP API to 
Flink.

I created Scala classes for Pattern and PatternStream, helper classes, as 
well as corresponding tests.

PatternStream in flink-cep is extended to obtain explicit TypeInformation 
from Scala code.
Pattern in flink-cep is slightly modified in the type parameters for member 
previous.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink dev-cep-scala

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1905.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1905


commit 306210da82c94f6770e22690799a3f1af9a877e7
Author: Stefan Richter 
Date:   2016-04-18T11:10:09Z

Preparations in Pattern and PatternStream of flink-cep to support 
flink-cep-scala functionality.

commit 731d96ae78c4dc1b7125966900886225374a3eb5
Author: Stefan Richter 
Date:   2016-04-18T11:10:35Z

Initial commit for flink-cep-scala API.

commit 7da7c14787226d9aa5914a138f632b8b6d5f1335
Author: Stefan Richter 
Date:   2016-04-18T13:48:51Z

[FLINK-3708] Added missing test dependency to pom.

commit 1468bd91aea7d2a757ddeea8b8506b5ad9f3d289
Author: Stefan Richter 
Date:   2016-04-18T14:50:53Z

[FLINK-3708] Pattern in Scala API now uses Option to shield users against 
null values from the Java API

commit 9ad8719c5b8767ec2fb0425f55cc7df105e36bcb
Author: Stefan Richter 
Date:   2016-04-18T16:04:35Z

[FLINK-3708] Added test-jar for build phase to pom.




> Scala API for CEP
> -
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)