[jira] [Created] (FLINK-3841) RocksDB statebackend creates empty dbs for stateless operators

2016-04-27 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-3841:
-

 Summary: RocksDB statebackend creates empty dbs for stateless 
operators
 Key: FLINK-3841
 URL: https://issues.apache.org/jira/browse/FLINK-3841
 Project: Flink
  Issue Type: Bug
  Components: state backends
Reporter: Gyula Fora
Priority: Minor


Even though they are not checkpointed there is always an open RocksDB database 
for all operators if the Rocks backend is set. I wonder if it would make sense 
to lazy initialize the dbs instead of doing it in the initializeForJob method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3840) RocksDB local parent dir is polluted with empty folders with random names

2016-04-27 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-3840:
-

 Summary: RocksDB local parent dir is polluted with empty folders 
with random names
 Key: FLINK-3840
 URL: https://issues.apache.org/jira/browse/FLINK-3840
 Project: Flink
  Issue Type: Bug
  Components: state backends
Reporter: Gyula Fora


For some reason when the job starts the rocksdb root dir filled with hundreds 
of empty folders with random names like:

041da1c-5fec-42ed-a69c-298240f1a065  4e6061aa-0c69-4755-a1ad-5ac4dec1d3f0  
a7004bd1-778c-4a0f-96d4-9941208d1888
00db8406-6cb4-46ad-aac9-beeaa3247d16



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261609#comment-15261609
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r61378731
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
 ---
@@ -215,7 +215,7 @@ class ScalarFunctionsTest {
 
   }
 
-  @Test
+  @Ignore
--- End diff --

`exp` `log` `pow` and `ln` should have `Double` as input. What I was 
thinking is, we should add some extra `type coercion` rules and add an `cast` 
when we can do it safely(when an expression is asking a `Double` but we provide 
a `Int`), for example, `Byte` to `Double`, `Long` to `Double` and so on, and 
rewrite the expression as `pow(cast(x, Double))`


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-27 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r61378731
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
 ---
@@ -215,7 +215,7 @@ class ScalarFunctionsTest {
 
   }
 
-  @Test
+  @Ignore
--- End diff --

`exp` `log` `pow` and `ln` should have `Double` as input. What I was 
thinking is, we should add some extra `type coercion` rules and add an `cast` 
when we can do it safely(when an expression is asking a `Double` but we provide 
a `Int`), for example, `Byte` to `Double`, `Long` to `Double` and so on, and 
rewrite the expression as `pow(cast(x, Double))`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261600#comment-15261600
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r61378085
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
 ---
@@ -17,13 +17,34 @@
  */
 package org.apache.flink.api.table.expressions
 
-import java.util.concurrent.atomic.AtomicInteger
-
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 
-abstract class Expression extends TreeNode[Expression] { self: Product =>
-  def name: String = Expression.freshName("expression")
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate.ExprValidationResult
+
+abstract class Expression extends TreeNode[Expression] {
+  /**
+* Returns the [[TypeInformation]] for evaluating this expression.
+* It is sometimes available until the expression is valid.
+*/
+  def dataType: TypeInformation[_]
+
+  /**
+* One pass validation of the expression tree in post order.
+*/
+  lazy val valid: Boolean = childrenValid && validateInput().isSuccess
+
+  def childrenValid: Boolean = children.forall(_.valid)
+
+  /**
+* Check input data types, inputs number or other properties specified 
by this expression.
+* Return `ValidationSuccess` if it pass the check,
+* or `ValidationFailure` with supplement message explaining the error.
+* Note: we should only call this method until `childrenValidated == 
true`
--- End diff --

Ah, yes!


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-27 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r61378085
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
 ---
@@ -17,13 +17,34 @@
  */
 package org.apache.flink.api.table.expressions
 
-import java.util.concurrent.atomic.AtomicInteger
-
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 
-abstract class Expression extends TreeNode[Expression] { self: Product =>
-  def name: String = Expression.freshName("expression")
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate.ExprValidationResult
+
+abstract class Expression extends TreeNode[Expression] {
+  /**
+* Returns the [[TypeInformation]] for evaluating this expression.
+* It is sometimes available until the expression is valid.
+*/
+  def dataType: TypeInformation[_]
+
+  /**
+* One pass validation of the expression tree in post order.
+*/
+  lazy val valid: Boolean = childrenValid && validateInput().isSuccess
+
+  def childrenValid: Boolean = children.forall(_.valid)
+
+  /**
+* Check input data types, inputs number or other properties specified 
by this expression.
+* Return `ValidationSuccess` if it pass the check,
+* or `ValidationFailure` with supplement message explaining the error.
+* Note: we should only call this method until `childrenValidated == 
true`
--- End diff --

Ah, yes!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2946) Add orderBy() to Table API

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1526#comment-1526
 ] 

ASF GitHub Bot commented on FLINK-2946:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1926#issuecomment-215254090
  
Thanks for the update @dawidwys. PR looks good except for a few minor 
comments.
Can you also rebase to the current master? A recent commit updated some 
code that your PR is touching (`DataSetScan`).

Then it should be good to merge.


> Add orderBy() to Table API
> --
>
> Key: FLINK-2946
> URL: https://issues.apache.org/jira/browse/FLINK-2946
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>
> In order to implement a FLINK-2099 prototype that uses the Table APIs code 
> generation facilities, the Table API needs a sorting feature.
> I would implement it the next days. Ideas how to implement such a sorting 
> feature are very welcome. Is there any more efficient way instead of 
> {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the 
> nodes first and finally sort on one node afterwards?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-27 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1926#issuecomment-215254090
  
Thanks for the update @dawidwys. PR looks good except for a few minor 
comments.
Can you also rebase to the current master? A recent commit updated some 
code that your PR is touching (`DataSetScan`).

Then it should be good to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r61349569
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import 
org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SortITCase(
+mode: TestExecutionMode,
+configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  def getExecutionEnvironment = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(4)
+env
+  }
+
+  val tupleDataSetStrings = List((1, 1L, "Hi")
+,(2, 2L, "Hello")
+,(3, 2L, "Hello world")
+,(4, 3L, "Hello world, how are you?")
+,(5, 3L, "I am fine.")
+,(6, 3L, "Luke Skywalker")
+,(7, 4L, "Comment#1")
+,(8, 4L, "Comment#2")
+,(9, 4L, "Comment#3")
+,(10, 4L, "Comment#4")
+,(11, 5L, "Comment#5")
+,(12, 5L, "Comment#6")
+,(13, 5L, "Comment#7")
+,(14, 5L, "Comment#8")
+,(15, 5L, "Comment#9")
+,(16, 6L, "Comment#10")
+,(17, 6L, "Comment#11")
+,(18, 6L, "Comment#12")
+,(19, 6L, "Comment#13")
+,(20, 6L, "Comment#14")
+,(21, 6L, "Comment#15"))
+
+  @Test
+  def testOrderByDesc(): Unit = {
+val env = getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val ds = CollectionDataSets.get3TupleDataSet(env)
+val t = ds.toTable(tEnv).orderBy('_1.desc)
+implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+  - x.productElement(0).asInstanceOf[Int])
+
+val expected = sortExpectedly(tupleDataSetStrings)
+val results = t.toDataSet[Row].mapPartition(rows => 
Seq(rows.toSeq)).collect()
+
+val result = results.filterNot(_.isEmpty).sortBy(p => 
p.min).reduceLeft(_ ++ _)
--- End diff --

`p.min` should be `p.head` because p should already be sorted (same applies 
for the other tests as well).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2946) Add orderBy() to Table API

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261106#comment-15261106
 ] 

ASF GitHub Bot commented on FLINK-2946:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r61349569
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import 
org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SortITCase(
+mode: TestExecutionMode,
+configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  def getExecutionEnvironment = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(4)
+env
+  }
+
+  val tupleDataSetStrings = List((1, 1L, "Hi")
+,(2, 2L, "Hello")
+,(3, 2L, "Hello world")
+,(4, 3L, "Hello world, how are you?")
+,(5, 3L, "I am fine.")
+,(6, 3L, "Luke Skywalker")
+,(7, 4L, "Comment#1")
+,(8, 4L, "Comment#2")
+,(9, 4L, "Comment#3")
+,(10, 4L, "Comment#4")
+,(11, 5L, "Comment#5")
+,(12, 5L, "Comment#6")
+,(13, 5L, "Comment#7")
+,(14, 5L, "Comment#8")
+,(15, 5L, "Comment#9")
+,(16, 6L, "Comment#10")
+,(17, 6L, "Comment#11")
+,(18, 6L, "Comment#12")
+,(19, 6L, "Comment#13")
+,(20, 6L, "Comment#14")
+,(21, 6L, "Comment#15"))
+
+  @Test
+  def testOrderByDesc(): Unit = {
+val env = getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val ds = CollectionDataSets.get3TupleDataSet(env)
+val t = ds.toTable(tEnv).orderBy('_1.desc)
+implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+  - x.productElement(0).asInstanceOf[Int])
+
+val expected = sortExpectedly(tupleDataSetStrings)
+val results = t.toDataSet[Row].mapPartition(rows => 
Seq(rows.toSeq)).collect()
+
+val result = results.filterNot(_.isEmpty).sortBy(p => 
p.min).reduceLeft(_ ++ _)
--- End diff --

`p.min` should be `p.head` because p should already be sorted (same applies 
for the other tests as well).


> Add orderBy() to Table API
> --
>
> Key: FLINK-2946
> URL: https://issues.apache.org/jira/browse/FLINK-2946
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>
> In order to implement a FLINK-2099 prototype that uses the Table APIs code 
> generation facilities, the Table API needs a sorting feature.
> I would implement it the next days. Ideas how to implement such a sorting 
> feature are very welcome. Is there any more efficient way instead of 
> {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the 
> nodes first and finally sort on one node afterwards?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r61349399
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.MapRunner
+
+package object dataset {
+
+  private[dataset] def getConversionMapper(config: TableConfig,
+   inputType: TypeInformation[Any],
+   expectedType: 
TypeInformation[Any],
+   conversionOperatorName: String,
+   fieldNames: Seq[String],
+   inputPojoFieldMapping: 
Option[Array[Int]] = None)
+  : MapFunction[Any, Any] = {
+
+val generator = new CodeGenerator(config,
--- End diff --

move first parameter to next line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r61349443
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.{LogicalJoin, LogicalSort}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSort}
+
+class DataSetSortRule
+  extends ConverterRule(
+classOf[LogicalSort],
+Convention.NONE,
+DataSetConvention.INSTANCE,
+"FlinkSortRule") {
--- End diff --

"FlinkSortRule" -> "DataSetSortRule"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2946) Add orderBy() to Table API

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261103#comment-15261103
 ] 

ASF GitHub Bot commented on FLINK-2946:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r61349399
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.MapRunner
+
+package object dataset {
+
+  private[dataset] def getConversionMapper(config: TableConfig,
+   inputType: TypeInformation[Any],
+   expectedType: 
TypeInformation[Any],
+   conversionOperatorName: String,
+   fieldNames: Seq[String],
+   inputPojoFieldMapping: 
Option[Array[Int]] = None)
+  : MapFunction[Any, Any] = {
+
+val generator = new CodeGenerator(config,
--- End diff --

move first parameter to next line


> Add orderBy() to Table API
> --
>
> Key: FLINK-2946
> URL: https://issues.apache.org/jira/browse/FLINK-2946
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>
> In order to implement a FLINK-2099 prototype that uses the Table APIs code 
> generation facilities, the Table API needs a sorting feature.
> I would implement it the next days. Ideas how to implement such a sorting 
> feature are very welcome. Is there any more efficient way instead of 
> {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the 
> nodes first and finally sort on one node afterwards?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2946) Add orderBy() to Table API

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261105#comment-15261105
 ] 

ASF GitHub Bot commented on FLINK-2946:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r61349443
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.{LogicalJoin, LogicalSort}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSort}
+
+class DataSetSortRule
+  extends ConverterRule(
+classOf[LogicalSort],
+Convention.NONE,
+DataSetConvention.INSTANCE,
+"FlinkSortRule") {
--- End diff --

"FlinkSortRule" -> "DataSetSortRule"


> Add orderBy() to Table API
> --
>
> Key: FLINK-2946
> URL: https://issues.apache.org/jira/browse/FLINK-2946
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>
> In order to implement a FLINK-2099 prototype that uses the Table APIs code 
> generation facilities, the Table API needs a sorting feature.
> I would implement it the next days. Ideas how to implement such a sorting 
> feature are very welcome. Is there any more efficient way instead of 
> {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the 
> nodes first and finally sort on one node afterwards?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r61349224
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.MapRunner
+
+package object dataset {
+
+  private[dataset] def getConversionMapper(config: TableConfig,
+   inputType: TypeInformation[Any],
--- End diff --

We follow a different indention style and move also the first parameter to 
the next line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2946) Add orderBy() to Table API

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261099#comment-15261099
 ] 

ASF GitHub Bot commented on FLINK-2946:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r61349224
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.MapRunner
+
+package object dataset {
+
+  private[dataset] def getConversionMapper(config: TableConfig,
+   inputType: TypeInformation[Any],
--- End diff --

We follow a different indention style and move also the first parameter to 
the next line.


> Add orderBy() to Table API
> --
>
> Key: FLINK-2946
> URL: https://issues.apache.org/jira/browse/FLINK-2946
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>
> In order to implement a FLINK-2099 prototype that uses the Table APIs code 
> generation facilities, the Table API needs a sorting feature.
> I would implement it the next days. Ideas how to implement such a sorting 
> feature are very welcome. Is there any more efficient way instead of 
> {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the 
> nodes first and finally sort on one node afterwards?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2946) Add orderBy() to Table API

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261097#comment-15261097
 ] 

ASF GitHub Bot commented on FLINK-2946:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r61348923
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.MapRunner
+
+package object dataset {
+
+  private[dataset] def getConversionMapper(config: TableConfig,
--- End diff --

Can you move this method to `DataSetRel`?


> Add orderBy() to Table API
> --
>
> Key: FLINK-2946
> URL: https://issues.apache.org/jira/browse/FLINK-2946
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>
> In order to implement a FLINK-2099 prototype that uses the Table APIs code 
> generation facilities, the Table API needs a sorting feature.
> I would implement it the next days. Ideas how to implement such a sorting 
> feature are very welcome. Is there any more efficient way instead of 
> {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the 
> nodes first and finally sort on one node afterwards?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2946) Add orderBy() to Table API

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261094#comment-15261094
 ] 

ASF GitHub Bot commented on FLINK-2946:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r61348811
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.PojoTypeInfo
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConverters._
+
+class DataSetSort(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inp: RelNode,
+collations: RelCollation,
+rowType2: RelDataType)
+  extends SingleRel(cluster, traitSet, inp)
+  with DataSetRel{
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode ={
+new DataSetSort(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  collations,
+  rowType2
+)
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]] = None): 
DataSet[Any] = {
+
+val config = tableEnv.getConfig
+
+val inputDS = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+
+val currentParallelism = inputDS.getExecutionEnvironment.getParallelism
+var partitionedDs = if (currentParallelism == 1) {
+  inputDS
+} else {
+  inputDS.partitionByRange(fieldCollations.map(_._1): _*)
+.withOrders(fieldCollations.map(_._2): _*)
+}
+
+fieldCollations.foreach { fieldCollation =>
+  partitionedDs = partitionedDs.sortPartition(fieldCollation._1, 
fieldCollation._2)
+}
+
+val inputType = partitionedDs.getType
+expectedType match {
+
+  case None if config.getEfficientTypeUsage =>
+partitionedDs
+
+  case _ =>
+val determinedType = determineReturnType(
+  getRowType,
+  expectedType,
+  config.getNullCheck,
+  config.getEfficientTypeUsage)
+
+// conversion
+if (determinedType != inputType) {
+
+  val mapFunc = getConversionMapper(config,
+partitionedDs.getType,
+determinedType,
+"DataSetSortConversion",
+getRowType.getFieldNames.asScala
+  )
+
+  partitionedDs.map(mapFunc)
+}
+// no conversion necessary, forward
+else {
+  partitionedDs
+}
+}
+  }
+
+  private def directionToOrder(direction: Direction) = {
+direction match {
+  case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => 
Order.ASCENDING
+  case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => 
Order.DESCENDING
+}
+
+  }
+
+  private val fieldCollations = collations.getFieldCollations.asScala
+.map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))
+
+  private val sortFieldsToString = fieldCollations
+.map(col => s"${rowType2.getFieldNames.get(col._1)} 
${col._2.

[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r61348923
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.MapRunner
+
+package object dataset {
+
+  private[dataset] def getConversionMapper(config: TableConfig,
--- End diff --

Can you move this method to `DataSetRel`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r61348811
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.PojoTypeInfo
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConverters._
+
+class DataSetSort(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inp: RelNode,
+collations: RelCollation,
+rowType2: RelDataType)
+  extends SingleRel(cluster, traitSet, inp)
+  with DataSetRel{
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode ={
+new DataSetSort(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  collations,
+  rowType2
+)
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]] = None): 
DataSet[Any] = {
+
+val config = tableEnv.getConfig
+
+val inputDS = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+
+val currentParallelism = inputDS.getExecutionEnvironment.getParallelism
+var partitionedDs = if (currentParallelism == 1) {
+  inputDS
+} else {
+  inputDS.partitionByRange(fieldCollations.map(_._1): _*)
+.withOrders(fieldCollations.map(_._2): _*)
+}
+
+fieldCollations.foreach { fieldCollation =>
+  partitionedDs = partitionedDs.sortPartition(fieldCollation._1, 
fieldCollation._2)
+}
+
+val inputType = partitionedDs.getType
+expectedType match {
+
+  case None if config.getEfficientTypeUsage =>
+partitionedDs
+
+  case _ =>
+val determinedType = determineReturnType(
+  getRowType,
+  expectedType,
+  config.getNullCheck,
+  config.getEfficientTypeUsage)
+
+// conversion
+if (determinedType != inputType) {
+
+  val mapFunc = getConversionMapper(config,
+partitionedDs.getType,
+determinedType,
+"DataSetSortConversion",
+getRowType.getFieldNames.asScala
+  )
+
+  partitionedDs.map(mapFunc)
+}
+// no conversion necessary, forward
+else {
+  partitionedDs
+}
+}
+  }
+
+  private def directionToOrder(direction: Direction) = {
+direction match {
+  case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => 
Order.ASCENDING
+  case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => 
Order.DESCENDING
+}
+
+  }
+
+  private val fieldCollations = collations.getFieldCollations.asScala
+.map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))
+
+  private val sortFieldsToString = fieldCollations
+.map(col => s"${rowType2.getFieldNames.get(col._1)} 
${col._2.getShortName}" ).mkString(", ")
+
+  override def toString: String = s"Sort(by: $sortFieldsToString)"
+
+  override def explainTerms(pw: RelWriter) : RelWriter = {
+super.explainTerms(pw)
+  .item("by", sortFieldsToString)
 

[jira] [Commented] (FLINK-2946) Add orderBy() to Table API

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261092#comment-15261092
 ] 

ASF GitHub Bot commented on FLINK-2946:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r61348408
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+
+abstract class Ordering extends UnaryExpression { self: Product =>
+}
+
+case class Asc(child: Expression) extends Ordering{
--- End diff --

`Ordering{` -> `Ordering {`


> Add orderBy() to Table API
> --
>
> Key: FLINK-2946
> URL: https://issues.apache.org/jira/browse/FLINK-2946
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>
> In order to implement a FLINK-2099 prototype that uses the Table APIs code 
> generation facilities, the Table API needs a sorting feature.
> I would implement it the next days. Ideas how to implement such a sorting 
> feature are very welcome. Is there any more efficient way instead of 
> {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the 
> nodes first and finally sort on one node afterwards?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r61348408
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+
+abstract class Ordering extends UnaryExpression { self: Product =>
+}
+
+case class Asc(child: Expression) extends Ordering{
--- End diff --

`Ordering{` -> `Ordering {`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1827) Move test classes in test folders and fix scope of test dependencies

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260793#comment-15260793
 ] 

ASF GitHub Bot commented on FLINK-1827:
---

Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/1915#issuecomment-215201773
  
Any idea what might be causing the error in FlinkML? I can't pinpoint it to 
any specific source file. 


> Move test classes in test folders and fix scope of test dependencies
> 
>
> Key: FLINK-1827
> URL: https://issues.apache.org/jira/browse/FLINK-1827
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 0.9
>Reporter: Flavio Pompermaier
>Priority: Minor
>  Labels: test-compile
>   Original Estimate: 4h
>  Remaining Estimate: 4h
>
> Right now it is not possible to avoid compilation of test classes 
> (-Dmaven.test.skip=true) because some project (e.g. flink-test-utils) 
> requires test classes in non-test sources (e.g. 
> scalatest_${scala.binary.version})
> Test classes should be moved to src/main/test (if Java) and src/test/scala 
> (if scala) and use scope=test for test dependencies



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1827] and small fixes in some tests

2016-04-27 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/1915#issuecomment-215201773
  
Any idea what might be causing the error in FlinkML? I can't pinpoint it to 
any specific source file. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3771) Methods for translating Graphs

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260573#comment-15260573
 ] 

ASF GitHub Bot commented on FLINK-3771:
---

Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1900#issuecomment-215167381
  
The switch to `MapFunction` means that a user can use a `RichMapFunction` 
with open, close, accumulators, etc.

Rather than add six new methods to `Graph` I implemented these with three 
`GraphAlgorithm` which use the Gellyish `Graph.run(GraphAlgorithm)`


> Methods for translating Graphs
> --
>
> Key: FLINK-3771
> URL: https://issues.apache.org/jira/browse/FLINK-3771
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Provide methods for translation of the type or value of graph labels, vertex 
> values, and edge values.
> Sample use cases:
> * shifting graph labels in order to union generated graphs or graphs read 
> from multiple sources
> * downsizing labels or values since algorithms prefer to generate wide types 
> which may be expensive for further computation
> * changing label type for testing or benchmarking alternative code paths



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

2016-04-27 Thread greghogan
Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1900#issuecomment-215167381
  
The switch to `MapFunction` means that a user can use a `RichMapFunction` 
with open, close, accumulators, etc.

Rather than add six new methods to `Graph` I implemented these with three 
`GraphAlgorithm` which use the Gellyish `Graph.run(GraphAlgorithm)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2971) Add outer joins to the Table API

2016-04-27 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260493#comment-15260493
 ] 

Dawid Wysakowicz commented on FLINK-2971:
-

I encounted another problem. Regarding null handling. Right now the generated 
code for FlatJoinFunction fails when one of the input elements is null. I saw 
some issues about null handling, is somebody working on that right now? Are 
there any decisions about null handling?

> Add outer joins to the Table API
> 
>
> Key: FLINK-2971
> URL: https://issues.apache.org/jira/browse/FLINK-2971
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>
> Since Flink now supports outer joins, the Table API can also support left, 
> right and full outer joins.
> Given that null values are properly supported by RowSerializer etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3839) Support wildcards in classpath parameters

2016-04-27 Thread Ken Krugler (JIRA)
Ken Krugler created FLINK-3839:
--

 Summary: Support wildcards in classpath parameters
 Key: FLINK-3839
 URL: https://issues.apache.org/jira/browse/FLINK-3839
 Project: Flink
  Issue Type: Improvement
Reporter: Ken Krugler
Priority: Minor


Currently you can only specify a single explict jar with the CLI --classpath 
file:// parameter.Java (since 1.6) has allowed you to use -cp 
/* as a way of adding every file that ends in .jar in a 
directory.

This would simplify things, e.g. when running on EMR you have to add roughly 
120 jars explicitly, but these are all located in just two directories.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3838) CLI parameter parser is munging application params

2016-04-27 Thread Ken Krugler (JIRA)
Ken Krugler created FLINK-3838:
--

 Summary: CLI parameter parser is munging application params
 Key: FLINK-3838
 URL: https://issues.apache.org/jira/browse/FLINK-3838
 Project: Flink
  Issue Type: Bug
  Components: Command-line client
Affects Versions: 1.0.2
Reporter: Ken Krugler
Priority: Minor


If parameters for an application use a single '-' (e.g. -maxtasks) then the CLI 
argument parser will munge these, and the app gets passed either just the 
parameter name (e.g. 'maxtask') if the start of the parameter doesn't match a 
Flink parameter, or you get two values, with the first value being the part 
that matched (e.g. '-m') and the second value being the rest (e.g. 'axtasks').

The parser should ignore everything after the jar path parameter.

Note that using -- does seem to work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260456#comment-15260456
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-215142967
  
Hi @yjshen, thanks for your patience. I also finished a first pass over the 
PR. 

I'd like to propose a third alternative, in addition to the custom 
validation phase (this PR) and generating `SqlNode`s and using Calcite's 
validator. Both approaches would mean that the validation happens before the 
logical plan is translated into a `RelNode`. I think it would be good to 
eagerly check each method call of the Table API. This would make debugging 
easier, because exceptions would be thrown where the error is caused. Please 
correct me if I am wrong, but I think we would not lose validation coverage 
compared to the coverage this PR if we do eager validation? It might also be 
easier, because we do not need the recursive operator traversal (still the 
expression traversal though). Maybe we can even directly translate to 
`RelNode`s after validation, just like we do right now. I think a lot of this 
PR could be used for eager validation, not sure if it would be easily possible 
with the `SqlNode` validation approach. 
What do you think about eagerly validation, @yjshen and @twalthr?

While reviewing the PR, I noticed that some classes seem to be partially 
derived from Spark's code base (e.g., `TreeNode` and `RuleExecutor`). I know 
there are some common patterns that apply in optimizers, but it is good style 
to give credit to the original source code. 
Can you list which classes are derived from Spark code and add a comment to 
them pointing to the source of the code?

Thanks, Fabian


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-27 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-215142967
  
Hi @yjshen, thanks for your patience. I also finished a first pass over the 
PR. 

I'd like to propose a third alternative, in addition to the custom 
validation phase (this PR) and generating `SqlNode`s and using Calcite's 
validator. Both approaches would mean that the validation happens before the 
logical plan is translated into a `RelNode`. I think it would be good to 
eagerly check each method call of the Table API. This would make debugging 
easier, because exceptions would be thrown where the error is caused. Please 
correct me if I am wrong, but I think we would not lose validation coverage 
compared to the coverage this PR if we do eager validation? It might also be 
easier, because we do not need the recursive operator traversal (still the 
expression traversal though). Maybe we can even directly translate to 
`RelNode`s after validation, just like we do right now. I think a lot of this 
PR could be used for eager validation, not sure if it would be easily possible 
with the `SqlNode` validation approach. 
What do you think about eagerly validation, @yjshen and @twalthr?

While reviewing the PR, I noticed that some classes seem to be partially 
derived from Spark's code base (e.g., `TreeNode` and `RuleExecutor`). I know 
there are some common patterns that apply in optimizers, but it is good style 
to give credit to the original source code. 
Can you list which classes are derived from Spark code and add a comment to 
them pointing to the source of the code?

Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260447#comment-15260447
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r61291959
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
 ---
@@ -215,7 +215,7 @@ class ScalarFunctionsTest {
 
   }
 
-  @Test
+  @Ignore
--- End diff --

Why did you exclude these tests?


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r61291959
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
 ---
@@ -215,7 +215,7 @@ class ScalarFunctionsTest {
 
   }
 
-  @Test
+  @Ignore
--- End diff --

Why did you exclude these tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260442#comment-15260442
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r61291865
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
 ---
@@ -17,13 +17,34 @@
  */
 package org.apache.flink.api.table.expressions
 
-import java.util.concurrent.atomic.AtomicInteger
-
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 
-abstract class Expression extends TreeNode[Expression] { self: Product =>
-  def name: String = Expression.freshName("expression")
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate.ExprValidationResult
+
+abstract class Expression extends TreeNode[Expression] {
+  /**
+* Returns the [[TypeInformation]] for evaluating this expression.
+* It is sometimes available until the expression is valid.
+*/
+  def dataType: TypeInformation[_]
+
+  /**
+* One pass validation of the expression tree in post order.
+*/
+  lazy val valid: Boolean = childrenValid && validateInput().isSuccess
+
+  def childrenValid: Boolean = children.forall(_.valid)
+
+  /**
+* Check input data types, inputs number or other properties specified 
by this expression.
+* Return `ValidationSuccess` if it pass the check,
+* or `ValidationFailure` with supplement message explaining the error.
+* Note: we should only call this method until `childrenValidated == 
true`
--- End diff --

`childrenValidated` -> `childrenValid`?


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r61291865
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
 ---
@@ -17,13 +17,34 @@
  */
 package org.apache.flink.api.table.expressions
 
-import java.util.concurrent.atomic.AtomicInteger
-
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 
-abstract class Expression extends TreeNode[Expression] { self: Product =>
-  def name: String = Expression.freshName("expression")
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate.ExprValidationResult
+
+abstract class Expression extends TreeNode[Expression] {
+  /**
+* Returns the [[TypeInformation]] for evaluating this expression.
+* It is sometimes available until the expression is valid.
+*/
+  def dataType: TypeInformation[_]
+
+  /**
+* One pass validation of the expression tree in post order.
+*/
+  lazy val valid: Boolean = childrenValid && validateInput().isSuccess
+
+  def childrenValid: Boolean = children.forall(_.valid)
+
+  /**
+* Check input data types, inputs number or other properties specified 
by this expression.
+* Return `ValidationSuccess` if it pass the check,
+* or `ValidationFailure` with supplement message explaining the error.
+* Note: we should only call this method until `childrenValidated == 
true`
--- End diff --

`childrenValidated` -> `childrenValid`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260439#comment-15260439
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r61291698
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
 ---
@@ -17,13 +17,34 @@
  */
 package org.apache.flink.api.table.expressions
 
-import java.util.concurrent.atomic.AtomicInteger
-
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 
-abstract class Expression extends TreeNode[Expression] { self: Product =>
-  def name: String = Expression.freshName("expression")
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate.ExprValidationResult
+
+abstract class Expression extends TreeNode[Expression] {
+  /**
+* Returns the [[TypeInformation]] for evaluating this expression.
+* It is sometimes available until the expression is valid.
--- End diff --

+not ?


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r61291698
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
 ---
@@ -17,13 +17,34 @@
  */
 package org.apache.flink.api.table.expressions
 
-import java.util.concurrent.atomic.AtomicInteger
-
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 
-abstract class Expression extends TreeNode[Expression] { self: Product =>
-  def name: String = Expression.freshName("expression")
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate.ExprValidationResult
+
+abstract class Expression extends TreeNode[Expression] {
+  /**
+* Returns the [[TypeInformation]] for evaluating this expression.
+* It is sometimes available until the expression is valid.
--- End diff --

+not ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r61291621
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -355,4 +380,26 @@ object TableEnvironment {
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
 
+  /**
+* The primary workflow for executing plan validation for that 
generated from Table API.
+* The validation is intentionally designed as a lazy procedure and 
triggered when we
+* are going to run on Flink core.
+*/
+  class PlanPreparation(val env: TableEnvironment, val logical: 
LogicalNode) {
+
+lazy val resolvedPlan: LogicalNode = env.getValidator.resolve(logical)
+
+def validate(): Unit = env.getValidator.validate(resolvedPlan)
+
+lazy val relNode: RelNode = {
+  env match {
+case _: BatchTableEnvironment =>
--- End diff --

Why do you distinguish here? It's the same code, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260438#comment-15260438
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r61291621
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -355,4 +380,26 @@ object TableEnvironment {
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
 
+  /**
+* The primary workflow for executing plan validation for that 
generated from Table API.
+* The validation is intentionally designed as a lazy procedure and 
triggered when we
+* are going to run on Flink core.
+*/
+  class PlanPreparation(val env: TableEnvironment, val logical: 
LogicalNode) {
+
+lazy val resolvedPlan: LogicalNode = env.getValidator.resolve(logical)
+
+def validate(): Unit = env.getValidator.validate(resolvedPlan)
+
+lazy val relNode: RelNode = {
+  env match {
+case _: BatchTableEnvironment =>
--- End diff --

Why do you distinguish here? It's the same code, no?


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3669) WindowOperator registers a lot of timers at StreamTask

2016-04-27 Thread Konstantin Knauf (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260431#comment-15260431
 ] 

Konstantin Knauf commented on FLINK-3669:
-

I made the changes as suggested, and will open a PR after the travis build 
later this evening. 

Unfortunately, I will not be able to test it with the original application 
(customer code).

I could try to reproduce the issue locally and test it with the new version, 
but this will take at least till next weekend as I can only do that on my own 
time.

> WindowOperator registers a lot of timers at StreamTask
> --
>
> Key: FLINK-3669
> URL: https://issues.apache.org/jira/browse/FLINK-3669
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.1
>Reporter: Aljoscha Krettek
>Assignee: Konstantin Knauf
>Priority: Blocker
>
> Right now, the WindowOperator registers a timer at the StreamTask for every 
> processing-time timer that a Trigger registers. We should combine several 
> registered trigger timers to only register one low-level timer (timer 
> coalescing).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3836) Change Histogram to enable Long counters

2016-04-27 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260391#comment-15260391
 ] 

Fabian Hueske commented on FLINK-3836:
--

Sure, done.
I also gave you contributor permissions for Flink's JIRA. You can now assign 
issues to yourself.

Cheers, Fabian

> Change Histogram to enable Long counters
> 
>
> Key: FLINK-3836
> URL: https://issues.apache.org/jira/browse/FLINK-3836
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.2
>Reporter: Maximilian Bode
>Assignee: Maximilian Bode
>Priority: Minor
>
> Change 
> flink/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
>  to enable Long counts instead of Integer. In particular, change the TreeMap 
> to be .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3836) Change Histogram to enable Long counters

2016-04-27 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-3836:
-
Assignee: Maximilian Bode

> Change Histogram to enable Long counters
> 
>
> Key: FLINK-3836
> URL: https://issues.apache.org/jira/browse/FLINK-3836
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.2
>Reporter: Maximilian Bode
>Assignee: Maximilian Bode
>Priority: Minor
>
> Change 
> flink/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
>  to enable Long counts instead of Integer. In particular, change the TreeMap 
> to be .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3836) Change Histogram to enable Long counters

2016-04-27 Thread Maximilian Bode (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260382#comment-15260382
 ] 

Maximilian Bode commented on FLINK-3836:


I would like to take care of this, can someone assign it to me?

> Change Histogram to enable Long counters
> 
>
> Key: FLINK-3836
> URL: https://issues.apache.org/jira/browse/FLINK-3836
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.2
>Reporter: Maximilian Bode
>Priority: Minor
>
> Change 
> flink/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
>  to enable Long counts instead of Integer. In particular, change the TreeMap 
> to be .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2971) Add outer joins to the Table API

2016-04-27 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260305#comment-15260305
 ] 

Dawid Wysakowicz commented on FLINK-2971:
-

Yes, you're right. 9 methods.

> Add outer joins to the Table API
> 
>
> Key: FLINK-2971
> URL: https://issues.apache.org/jira/browse/FLINK-2971
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>
> Since Flink now supports outer joins, the Table API can also support left, 
> right and full outer joins.
> Given that null values are properly supported by RowSerializer etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2971) Add outer joins to the Table API

2016-04-27 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260304#comment-15260304
 ] 

Fabian Hueske commented on FLINK-2971:
--

Should be 9 methods because the outer joins require the predicates, right?

> Add outer joins to the Table API
> 
>
> Key: FLINK-2971
> URL: https://issues.apache.org/jira/browse/FLINK-2971
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>
> Since Flink now supports outer joins, the Table API can also support left, 
> right and full outer joins.
> Given that null values are properly supported by RowSerializer etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2971) Add outer joins to the Table API

2016-04-27 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260294#comment-15260294
 ] 

Dawid Wysakowicz commented on FLINK-2971:
-

I started with that idea, just one small disadvantage I saw was it will result 
in 4(3* outer + inner)*3(no joinPredicate, as Expression, as String) = 12 
methods. But it would probably be more user friendly. So if you're ok with it, 
I will change it.

> Add outer joins to the Table API
> 
>
> Key: FLINK-2971
> URL: https://issues.apache.org/jira/browse/FLINK-2971
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>
> Since Flink now supports outer joins, the Table API can also support left, 
> right and full outer joins.
> Given that null values are properly supported by RowSerializer etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61273003
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   private Queue unReadSplits;
+
+   private transient SplitReader reader;
+   private transient Object checkpointLock;
+   private transient TimestampedCollector collector;
+
+   private Configuration configuration;
+   private FileInputFormat format;
+   private TypeInformation typeInfo;
+   private transient TypeSerializer serializer;
+
+   public FileSplitReadOperator(FileInputFormat format, 
TypeInformation typeInfo, Configuration configuration) {
+   this.format = format;
+   this.typeInfo = typeInfo;
+   this.configuration = configuration;
+   this.unReadSplits = new ConcurrentLinkedQueue<>();
+
+   // this is for the extra thread that is reading,
+   // the tests were not terminating because the success
+   // exception was caught by the extra thread, and never
+   // forwarded.
+
+   // for now, and to keep the extra thread to avoid backpressure,
+   // we just disable chaining for the operator, so that it runs on
+   // another thread. This will change later for a cleaner design.
+   setChainingStrategy(ChainingStrategy.NEVER);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   this.format.configure(configuration);
+   this.collector = new TimestampedCollector(output);
+   this.checkpointLock = getContainingTask()
+   .getCheckpointLock();
+   this.serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+
+   this.reader = new SplitReader(unReadSplits, format, serializer, 
collector, checkpointLock);
+   this.reader.start();
+   }
+
+   @Override
+   public void processElement(StreamRecord element) throws 
Exception {
+   LOG.info("Reading Split: " + element.getValue());
+   this.unReadSplits.add(e

[jira] [Created] (FLINK-3837) Create FLOOR/CEIL function

2016-04-27 Thread Timo Walther (JIRA)
Timo Walther created FLINK-3837:
---

 Summary: Create FLOOR/CEIL function
 Key: FLINK-3837
 URL: https://issues.apache.org/jira/browse/FLINK-3837
 Project: Flink
  Issue Type: Sub-task
  Components: Table API
Reporter: Timo Walther
Assignee: Timo Walther
Priority: Minor


Create the FLOOR/CEIL function for Table API and SQL. They will later be 
extended in FLINK-3580 to support date and time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2971) Add outer joins to the Table API

2016-04-27 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260280#comment-15260280
 ] 

Vasia Kalavri commented on FLINK-2971:
--

+1 I was thinking the same!

> Add outer joins to the Table API
> 
>
> Key: FLINK-2971
> URL: https://issues.apache.org/jira/browse/FLINK-2971
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>
> Since Flink now supports outer joins, the Table API can also support left, 
> right and full outer joins.
> Given that null values are properly supported by RowSerializer etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2971) Add outer joins to the Table API

2016-04-27 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260270#comment-15260270
 ] 

Fabian Hueske commented on FLINK-2971:
--

I think it would be more intuitive to offer explicit outer join methods 
({{joinLeftOuter(right, joinPredicate)}}, {{joinRightOuter(right, 
joinPredicate)}}, and {{joinFullOuter(right, joinPredicate)}},) instead of 
overloading {{join}} with a join hint parameter.

What do you think?


> Add outer joins to the Table API
> 
>
> Key: FLINK-2971
> URL: https://issues.apache.org/jira/browse/FLINK-2971
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>
> Since Flink now supports outer joins, the Table API can also support left, 
> right and full outer joins.
> Given that null values are properly supported by RowSerializer etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [Flink-3691] extend avroinputformat to support...

2016-04-27 Thread gna-phetsarath
Github user gna-phetsarath commented on a diff in the pull request:

https://github.com/apache/flink/pull/1920#discussion_r61269058
  
--- Diff: 
flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
 ---
@@ -289,6 +290,119 @@ public void testDeserializeToSpecificType() throws 
IOException {
}
}
 
+   /**
+* Test if the AvroInputFormat is able to properly read data from an 
Avro
+* file as a GenericRecord.
+* 
+* @throws IOException,
+* if there is an exception
+*/
+   @SuppressWarnings("unchecked")
+   @Test
+   public void testDeserialisationGenericRecord() throws IOException {
+   Configuration parameters = new Configuration();
+
+   AvroInputFormat format = new 
AvroInputFormat(new Path(testFile.getAbsolutePath()),
+   GenericRecord.class);
+   try {
+   format.configure(parameters);
+   FileInputSplit[] splits = format.createInputSplits(1);
+   assertEquals(splits.length, 1);
+   format.open(splits[0]);
+
+   GenericRecord u = format.nextRecord(null);
--- End diff --

```AvroInputFormat``` does not have a ```nextRecord()``` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3836) Change Histogram to enable Long counters

2016-04-27 Thread Maximilian Bode (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260259#comment-15260259
 ] 

Maximilian Bode commented on FLINK-3836:


Good point, I guess this would be better.

> Change Histogram to enable Long counters
> 
>
> Key: FLINK-3836
> URL: https://issues.apache.org/jira/browse/FLINK-3836
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.2
>Reporter: Maximilian Bode
>Priority: Minor
>
> Change 
> flink/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
>  to enable Long counts instead of Integer. In particular, change the TreeMap 
> to be .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3836) Change Histogram to enable Long counters

2016-04-27 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260255#comment-15260255
 ] 

Greg Hogan commented on FLINK-3836:
---

{{Histogram}} is marked as part of the {{@Public}} API which can only be broken 
on new major releases. Could something like an equivalent {{LongHistogram}} be 
added instead?

> Change Histogram to enable Long counters
> 
>
> Key: FLINK-3836
> URL: https://issues.apache.org/jira/browse/FLINK-3836
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.2
>Reporter: Maximilian Bode
>Priority: Minor
>
> Change 
> flink/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
>  to enable Long counts instead of Integer. In particular, change the TreeMap 
> to be .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3444) env.fromElements relies on the first input element for determining the DataSet/DataStream type

2016-04-27 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260250#comment-15260250
 ] 

Chesnay Schepler commented on FLINK-3444:
-

It's the easiest and safest solution that gets the job done. At the time of the 
PR i already thought about your suggestion, but came to the conclusion that 
there probably can be cases where we would find the "wrong" super class. Then 
we would need the newly introduced method anyway.

> env.fromElements relies on the first input element for determining the 
> DataSet/DataStream type
> --
>
> Key: FLINK-3444
> URL: https://issues.apache.org/jira/browse/FLINK-3444
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, DataStream API
>Affects Versions: 0.10.0, 1.0.0
>Reporter: Vasia Kalavri
> Fix For: 1.1.0
>
>
> The {{fromElements}} method of the {{ExecutionEnvironment}} and 
> {{StreamExecutionEnvironment}} determines the DataSet/DataStream type by 
> extracting the type of the first input element.
> This is problematic if the first element is a subtype of another element in 
> the collection.
> For example, the following
> {code}
> DataStream input = env.fromElements(new Event(1, "a"), new SubEvent(2, 
> "b"));
> {code}
> succeeds, while the following
> {code}
> DataStream input = env.fromElements(new SubEvent(1, "a"), new Event(2, 
> "b"));
> {code}
> fails with "java.lang.IllegalArgumentException: The elements in the 
> collection are not all subclasses of SubEvent".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3836) Change Histogram to enable Long counters

2016-04-27 Thread Maximilian Bode (JIRA)
Maximilian Bode created FLINK-3836:
--

 Summary: Change Histogram to enable Long counters
 Key: FLINK-3836
 URL: https://issues.apache.org/jira/browse/FLINK-3836
 Project: Flink
  Issue Type: Improvement
  Components: Core
Affects Versions: 1.0.2
Reporter: Maximilian Bode
Priority: Minor


Change 
flink/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
 to enable Long counts instead of Integer. In particular, change the TreeMap to 
be .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2828) Add interfaces for Table API input formats

2016-04-27 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-2828.

   Resolution: Implemented
Fix Version/s: 1.1.0

Implemented for 1.1.0 with 4f5dbc2edcd3e3a403f2ecfe0cc0bdd95b26b177

> Add interfaces for Table API input formats
> --
>
> Key: FLINK-2828
> URL: https://issues.apache.org/jira/browse/FLINK-2828
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Fabian Hueske
> Fix For: 1.1.0
>
>
> In order to support input formats for the Table API, interfaces are 
> necessary. I propose two types of TableSources:
> - AdaptiveTableSources can adapt their output to the requirements of the 
> plan. Although the output schema stays the same, the TableSource can react on 
> field resolution and/or predicates internally and can return adapted 
> DataSet/DataStream versions in the "translate" step.
> - StaticTableSources are an easy way to provide the Table API with additional 
> input formats without much implementation effort (e.g. for fromCsvFile())
> TableSources need to be deeply integrated into the Table API.
> The TableEnvironment requires a newly introduced AbstractExecutionEnvironment 
> (common super class of all ExecutionEnvironments for DataSets and 
> DataStreams).
> Here's what a TableSource can see from more complicated queries:
> {code}
> getTableJava(tableSource1)
>   .filter("a===5 || a===6")
>   .select("a as a4, b as b4, c as c4")
>   .filter("b4===7")
>   .join(getTableJava(tableSource2))
>   .where("a===a4 && c==='Test' && c4==='Test2'")
> // Result predicates for tableSource1:
> //  List("a===5 || a===6", "b===7", "c==='Test2'")
> // Result predicates for tableSource2:
> //  List("c==='Test'")
> // Result resolved fields for tableSource1 (true = filtering, 
> false=selection):
> //  Set(("a", true), ("a", false), ("b", true), ("b", false), ("c", false), 
> ("c", true))
> // Result resolved fields for tableSource2 (true = filtering, 
> false=selection):
> //  Set(("a", true), ("c", true))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3835) JSON execution plan not helpful to debug plans with KeySelectors

2016-04-27 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-3835.

Resolution: Fixed

Fixed for 1.0.3 with 4a34f6f7342c3f40991a4d9e56f17811bad15d62 
Fixed for 1.1.0 with 80b09eaefe8f000f79f017ba822a631b8c29b5be 

> JSON execution plan not helpful to debug plans with KeySelectors
> 
>
> Key: FLINK-3835
> URL: https://issues.apache.org/jira/browse/FLINK-3835
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.1.0, 1.0.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
> Fix For: 1.1.0, 1.0.3
>
>
> The JSON execution plans are not helpful when debugging plans that include 
> join operators with key selectors. For joins with hash join strategy, the 
> driver strategy shows: {{"Hybrid Hash (build: Key Extractor)"}} where 
> {{(build: Key Extractor)}} shall help to identify the build side of the join. 
> However, if both inputs use KeySelectors, the build side cannot be identified.
> I propose to add the operator id to the build side information. The same 
> issue applied for cross driver strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2828] [tableAPI] Add TableSource interf...

2016-04-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1939


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2828) Add interfaces for Table API input formats

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260225#comment-15260225
 ] 

ASF GitHub Bot commented on FLINK-2828:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1939


> Add interfaces for Table API input formats
> --
>
> Key: FLINK-2828
> URL: https://issues.apache.org/jira/browse/FLINK-2828
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Fabian Hueske
>
> In order to support input formats for the Table API, interfaces are 
> necessary. I propose two types of TableSources:
> - AdaptiveTableSources can adapt their output to the requirements of the 
> plan. Although the output schema stays the same, the TableSource can react on 
> field resolution and/or predicates internally and can return adapted 
> DataSet/DataStream versions in the "translate" step.
> - StaticTableSources are an easy way to provide the Table API with additional 
> input formats without much implementation effort (e.g. for fromCsvFile())
> TableSources need to be deeply integrated into the Table API.
> The TableEnvironment requires a newly introduced AbstractExecutionEnvironment 
> (common super class of all ExecutionEnvironments for DataSets and 
> DataStreams).
> Here's what a TableSource can see from more complicated queries:
> {code}
> getTableJava(tableSource1)
>   .filter("a===5 || a===6")
>   .select("a as a4, b as b4, c as c4")
>   .filter("b4===7")
>   .join(getTableJava(tableSource2))
>   .where("a===a4 && c==='Test' && c4==='Test2'")
> // Result predicates for tableSource1:
> //  List("a===5 || a===6", "b===7", "c==='Test2'")
> // Result predicates for tableSource2:
> //  List("c==='Test'")
> // Result resolved fields for tableSource1 (true = filtering, 
> false=selection):
> //  Set(("a", true), ("a", false), ("b", true), ("b", false), ("c", false), 
> ("c", true))
> // Result resolved fields for tableSource2 (true = filtering, 
> false=selection):
> //  Set(("a", true), ("c", true))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2828) Add interfaces for Table API input formats

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260223#comment-15260223
 ] 

ASF GitHub Bot commented on FLINK-2828:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1939#issuecomment-215096897
  
Merging


> Add interfaces for Table API input formats
> --
>
> Key: FLINK-2828
> URL: https://issues.apache.org/jira/browse/FLINK-2828
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Fabian Hueske
>
> In order to support input formats for the Table API, interfaces are 
> necessary. I propose two types of TableSources:
> - AdaptiveTableSources can adapt their output to the requirements of the 
> plan. Although the output schema stays the same, the TableSource can react on 
> field resolution and/or predicates internally and can return adapted 
> DataSet/DataStream versions in the "translate" step.
> - StaticTableSources are an easy way to provide the Table API with additional 
> input formats without much implementation effort (e.g. for fromCsvFile())
> TableSources need to be deeply integrated into the Table API.
> The TableEnvironment requires a newly introduced AbstractExecutionEnvironment 
> (common super class of all ExecutionEnvironments for DataSets and 
> DataStreams).
> Here's what a TableSource can see from more complicated queries:
> {code}
> getTableJava(tableSource1)
>   .filter("a===5 || a===6")
>   .select("a as a4, b as b4, c as c4")
>   .filter("b4===7")
>   .join(getTableJava(tableSource2))
>   .where("a===a4 && c==='Test' && c4==='Test2'")
> // Result predicates for tableSource1:
> //  List("a===5 || a===6", "b===7", "c==='Test2'")
> // Result predicates for tableSource2:
> //  List("c==='Test'")
> // Result resolved fields for tableSource1 (true = filtering, 
> false=selection):
> //  Set(("a", true), ("a", false), ("b", true), ("b", false), ("c", false), 
> ("c", true))
> // Result resolved fields for tableSource2 (true = filtering, 
> false=selection):
> //  Set(("a", true), ("c", true))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2828] [tableAPI] Add TableSource interf...

2016-04-27 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1939#issuecomment-215096897
  
Merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-3691] extend avroinputformat to support...

2016-04-27 Thread gna-phetsarath
Github user gna-phetsarath commented on the pull request:

https://github.com/apache/flink/pull/1920#issuecomment-215092007
  
ReflectDatumReader does not work with GenericRecord because it is an 
interface, so you need to use GenericDataReader.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-3691] extend avroinputformat to support...

2016-04-27 Thread gna-phetsarath
Github user gna-phetsarath commented on a diff in the pull request:

https://github.com/apache/flink/pull/1920#discussion_r61261060
  
--- Diff: 
flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
 ---
@@ -119,12 +144,18 @@ public E nextRecord(E reuseValue) throws IOException {
if (reachedEnd()) {
return null;
}
-   
-   if (!reuseAvroValue) {
-   reuseValue = 
InstantiationUtil.instantiate(avroValueType, Object.class);
+   if (isGenericRecord) {
--- End diff --

OK. I'll make the change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3669) WindowOperator registers a lot of timers at StreamTask

2016-04-27 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260171#comment-15260171
 ] 

Aljoscha Krettek commented on FLINK-3669:
-

I think we have a winner now :-) Could you also test whether it solves your 
problem when running your job?

It's also cool that you added a test for this. I changed it a bit to go through 
the complete checkpointing and operator lifecycle:
{code}
@Test
public void testRestoreAndSnapshotAreInSync() throws Exception {

final int WINDOW_SIZE = 3;
final int WINDOW_SLIDE = 1;

TypeInformation> inputType = 
TypeInfoParser.parse("Tuple2");

ReducingStateDescriptor> stateDesc = 
new ReducingStateDescriptor<>("window-contents",
new SumReducer(),
inputType.createSerializer(new 
ExecutionConfig()));

WindowOperator, Tuple2, Tuple2, TimeWindow> operator = new WindowOperator<>(
SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),

BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction>()),
EventTimeTrigger.create());


OneInputStreamOperatorTestHarness, 
Tuple2> testHarness =
new 
OneInputStreamOperatorTestHarness<>(operator);

testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);

operator.setInputType(inputType, new ExecutionConfig());
testHarness.open();

WindowOperator.Timer timer1 = new 
WindowOperator.Timer<>(1L, "key1", new TimeWindow(1L, 2L));
WindowOperator.Timer timer2 = new 
WindowOperator.Timer<>(3L, "key1", new TimeWindow(1L, 2L));
WindowOperator.Timer timer3 = new 
WindowOperator.Timer<>(2L, "key1", new TimeWindow(1L, 2L));
operator.processingTimeTimers.add(timer1);
operator.processingTimeTimers.add(timer2);
operator.processingTimeTimers.add(timer3);
operator.processingTimeTimersQueue.add(timer1);
operator.processingTimeTimersQueue.add(timer2);
operator.processingTimeTimersQueue.add(timer3);

operator.processingTimeTimerTimestamps.add(1L, 10);
operator.processingTimeTimerTimestamps.add(2L, 5);
operator.processingTimeTimerTimestamps.add(3L, 1);


StreamTaskState snapshot = testHarness.snapshot(0, 0);

WindowOperator, Tuple2, Tuple2, TimeWindow> otherOperator = new 
WindowOperator<>(
SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),

BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction>()),
EventTimeTrigger.create());

OneInputStreamOperatorTestHarness, 
Tuple2> otherTestHarness =
new 
OneInputStreamOperatorTestHarness<>(otherOperator);

otherTestHarness.configureForKeyedStream(new 
TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
otherOperator.setInputType(inputType, new ExecutionConfig());

otherTestHarness.setup();
otherTestHarness.restore(snapshot, 0);
otherTestHarness.open();

Assert.assertEquals(operator.processingTimeTimers, 
otherOperator.processingTimeTimers);

Assert.assertArrayEquals(operator.processingTimeTimersQueue.toArray(), 
otherOperator.processingTimeTimersQueue.toArray());
Assert.assertEquals(operator.processingTimeTimerTimestamps, 
otherOperator.processingTimeTimerTimestamps);
}
{code}

The test was good already but this is just something I would know since I wrote 
the test harnesses and the other operator tests. :-) This way, we also don't 
need the special {{restoreStateFrom}} and {{snapshotStateTo}} methods.

Will you open a PR?

> WindowOperator registers a lot of timers at StreamTask
> --
>
> Key: FLINK-3669
> 

[jira] [Assigned] (FLINK-3834) flink-statebackend-rocksdb

2016-04-27 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-3834:
---

Assignee: Chesnay Schepler

> flink-statebackend-rocksdb
> --
>
> Key: FLINK-3834
> URL: https://issues.apache.org/jira/browse/FLINK-3834
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1284) Uniform random sampling operator over windows

2016-04-27 Thread Austin Ouyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260129#comment-15260129
 ] 

Austin Ouyang commented on FLINK-1284:
--

Hi [~till.rohrmann],

Thanks! Really excited to get started on this.

> Uniform random sampling operator over windows
> -
>
> Key: FLINK-1284
> URL: https://issues.apache.org/jira/browse/FLINK-1284
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Paris Carbone
>Assignee: Austin Ouyang
>Priority: Minor
>
> It would be useful for several use cases to have a built-in uniform random 
> sampling operator in the streaming API that can operate on windows. This can 
> be used for example for online machine learning operations, evaluating 
> heuristics or continuous visualisation of representative values.
> The operator could be given a field and a number of random samples needed, 
> following a window statement as such:
> mystream.window(..).sample(fieldID,#samples)
> Given that pre-aggregation is enabled, this could perhaps be implemented as a 
> binary reduce operator or a combinable groupreduce that pre-aggregates the 
> empiricals of that field.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2971) Add outer joins to the Table API

2016-04-27 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260112#comment-15260112
 ] 

Fabian Hueske commented on FLINK-2971:
--

Hi Dawid,

I think the approach is good. A few comments on the WIP branch:

- I'd call the {{onColumns}} parameter rather {{joinPredicate}} to indicate 
that a predicate (boolean expression) is expected.
- Does Calcite accept non join predicates or do we have to check for that?
- Is the partial function actually a partial function if the last case catches 
everything?
- I think we can rename {{UnresolvedFieldReference}} to {{FieldReference}} and 
remove {{ResolvedFieldReference}} which is not used anymore. Hence 
{{MultiInputUnresolvedFieldReference}} can also be shorter.

Thanks, for working on this!

> Add outer joins to the Table API
> 
>
> Key: FLINK-2971
> URL: https://issues.apache.org/jira/browse/FLINK-2971
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>
> Since Flink now supports outer joins, the Table API can also support left, 
> right and full outer joins.
> Given that null values are properly supported by RowSerializer etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3444) env.fromElements relies on the first input element for determining the DataSet/DataStream type

2016-04-27 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260115#comment-15260115
 ] 

Till Rohrmann commented on FLINK-3444:
--

I'm wondering why the above described problem was solved by introducing an 
overloaded method {{fromElements}} which takes as the first parameter the 
{{Class}} of the elements? The problem is that if you do something like 
{{env.fromElements(SubClass.class, new ParentClass())}} then the vararg only 
variant of {{fromElements}} will be called with type {{Object}} as result 
(since Object is the super type of {{Class}} and {{ParentClass}}. The 
reason is that {{ParentClass}} is not a subtype of {{SubClass}} and thus the 
new {{fromElements}} method is not applicable.

Wouldn't it have been better to automatically infer the super type of all given 
elements? Usually the {{fromElements}} method is only called for very few 
elements and, thus, it should be feasible to iterate over all elements to 
extract the common super type. 

> env.fromElements relies on the first input element for determining the 
> DataSet/DataStream type
> --
>
> Key: FLINK-3444
> URL: https://issues.apache.org/jira/browse/FLINK-3444
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, DataStream API
>Affects Versions: 0.10.0, 1.0.0
>Reporter: Vasia Kalavri
> Fix For: 1.1.0
>
>
> The {{fromElements}} method of the {{ExecutionEnvironment}} and 
> {{StreamExecutionEnvironment}} determines the DataSet/DataStream type by 
> extracting the type of the first input element.
> This is problematic if the first element is a subtype of another element in 
> the collection.
> For example, the following
> {code}
> DataStream input = env.fromElements(new Event(1, "a"), new SubEvent(2, 
> "b"));
> {code}
> succeeds, while the following
> {code}
> DataStream input = env.fromElements(new SubEvent(1, "a"), new Event(2, 
> "b"));
> {code}
> fails with "java.lang.IllegalArgumentException: The elements in the 
> collection are not all subclasses of SubEvent".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3601) JobManagerTest times out on StopSignal test

2016-04-27 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-3601.
---
   Resolution: Fixed
Fix Version/s: 1.1.0

Thanks for the reminder! Had a fix already.

Increased timeout with 130a22d6c8839018068a8a000b4a2ed6a1ab3d49.

> JobManagerTest times out on StopSignal test
> ---
>
> Key: FLINK-3601
> URL: https://issues.apache.org/jira/browse/FLINK-3601
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Tests
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
>  Labels: test-stability
> Fix For: 1.1.0
>
>
> {noformat}
> testStopSignal(org.apache.flink.runtime.jobmanager.JobManagerTest)  Time 
> elapsed: 8.018 sec  <<< FAILURE!
> java.lang.AssertionError: assertion failed: timeout (3 seconds) during 
> expectMsgClass waiting for class 
> org.apache.flink.runtime.messages.JobManagerMessages$JobResultSuccess
>   at scala.Predef$.assert(Predef.scala:165)
>   at 
> akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:423)
>   at akka.testkit.TestKitBase$class.expectMsgClass(TestKit.scala:410)
>   at akka.testkit.TestKit.expectMsgClass(TestKit.scala:718)
>   at akka.testkit.JavaTestKit.expectMsgClass(JavaTestKit.java:397)
>   at 
> org.apache.flink.runtime.jobmanager.JobManagerTest$2.(JobManagerTest.java:273)
>   at 
> org.apache.flink.runtime.jobmanager.JobManagerTest.testStopSignal(JobManagerTest.java:237)
> {noformat}
> I propose to replace the default timeout per {{expectMsg}} of 3 seconds with 
> a total test duration of 15 seconds.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3824) ResourceManager may repeatedly connect to outdated JobManager in HA mode

2016-04-27 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels closed FLINK-3824.
-
Resolution: Fixed

Fixed with 7ec1300a740e9b8e1eb595bc15e752a167592e00.

> ResourceManager may repeatedly connect to outdated JobManager in HA mode
> 
>
> Key: FLINK-3824
> URL: https://issues.apache.org/jira/browse/FLINK-3824
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 1.1.0
>
>
> When the ResourceManager receives a new leading JobManager via the 
> LeaderRetrievalService it tries to register with this JobManager until 
> connected. If during registration a new leader gets elected, the 
> ResourceManager may still repeatedly try to register with the old one. This 
> doesn't affect the registration with the new JobManager but leaves error 
> messages in the log file and may process unnecessary messages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3669) WindowOperator registers a lot of timers at StreamTask

2016-04-27 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260086#comment-15260086
 ] 

Aljoscha Krettek commented on FLINK-3669:
-

Thanks! I'll have a look at your branch. For the test-stability, I think there 
is some problem with the maven caches on Travis. We're waiting for the Infra 
team to respond.

> WindowOperator registers a lot of timers at StreamTask
> --
>
> Key: FLINK-3669
> URL: https://issues.apache.org/jira/browse/FLINK-3669
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.1
>Reporter: Aljoscha Krettek
>Assignee: Konstantin Knauf
>Priority: Blocker
>
> Right now, the WindowOperator registers a timer at the StreamTask for every 
> processing-time timer that a Trigger registers. We should combine several 
> registered trigger timers to only register one low-level timer (timer 
> coalescing).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3835) JSON execution plan not helpful to debug plans with KeySelectors

2016-04-27 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-3835:


 Summary: JSON execution plan not helpful to debug plans with 
KeySelectors
 Key: FLINK-3835
 URL: https://issues.apache.org/jira/browse/FLINK-3835
 Project: Flink
  Issue Type: Bug
  Components: Optimizer
Affects Versions: 1.0.2, 1.1.0
Reporter: Fabian Hueske
Assignee: Fabian Hueske
 Fix For: 1.1.0, 1.0.3


The JSON execution plans are not helpful when debugging plans that include join 
operators with key selectors. For joins with hash join strategy, the driver 
strategy shows: {{"Hybrid Hash (build: Key Extractor)"}} where {{(build: Key 
Extractor)}} shall help to identify the build side of the join. However, if 
both inputs use KeySelectors, the build side cannot be identified.

I propose to add the operator id to the build side information. The same issue 
applied for cross driver strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260016#comment-15260016
 ] 

ASF GitHub Bot commented on FLINK-3754:
---

Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-215061267
  
@yjshen I looked through the code changes. I was quite impressed that your 
PR touches nearly every class of the current API.

Basically your changes seem to work, however, I'm not sure if we want to 
implement the validation phase for every scalar function ourselves. Actually, 
Calcite already comes with type inference, type checking and validation 
capabilities. I don't know if we want to reinvent the wheel at this point. Your 
approach inserts a layer under the Table API for doing the validation. However, 
instead, this layer could also translate the plan into a SQL tree (on top of 
RelNodes). We could then let Calcite do the work of validation.

This could also solve another problem that I faced when working on 
FLINK-3580. If you take a look at `StandardConvertletTable` of Calcite, you see 
that Calcite also does some conversions which we also need to implement 
ourselves if we do not base the Table API on top of SQL.

We need to discuss how we want to proceed. Both solution are not perfect.


> Add a validation phase before construct RelNode using TableAPI
> --
>
> Key: FLINK-3754
> URL: https://issues.apache.org/jira/browse/FLINK-3754
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Affects Versions: 1.0.0
>Reporter: Yijie Shen
>Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before 
> RelNode construction, Table API lacks the counterparts and the validation is 
> scattered in many places.
> I suggest to add a single validation phase and detect problems as early as 
> possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-27 Thread twalthr
Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-215061267
  
@yjshen I looked through the code changes. I was quite impressed that your 
PR touches nearly every class of the current API.

Basically your changes seem to work, however, I'm not sure if we want to 
implement the validation phase for every scalar function ourselves. Actually, 
Calcite already comes with type inference, type checking and validation 
capabilities. I don't know if we want to reinvent the wheel at this point. Your 
approach inserts a layer under the Table API for doing the validation. However, 
instead, this layer could also translate the plan into a SQL tree (on top of 
RelNodes). We could then let Calcite do the work of validation.

This could also solve another problem that I faced when working on 
FLINK-3580. If you take a look at `StandardConvertletTable` of Calcite, you see 
that Calcite also does some conversions which we also need to implement 
ourselves if we do not base the Table API on top of SQL.

We need to discuss how we want to proceed. Both solution are not perfect.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3601) JobManagerTest times out on StopSignal test

2016-04-27 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260002#comment-15260002
 ] 

Matthias J. Sax commented on FLINK-3601:


[~mxm] Are you working on this? If not, I could fix this.

> JobManagerTest times out on StopSignal test
> ---
>
> Key: FLINK-3601
> URL: https://issues.apache.org/jira/browse/FLINK-3601
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Tests
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
>  Labels: test-stability
>
> {noformat}
> testStopSignal(org.apache.flink.runtime.jobmanager.JobManagerTest)  Time 
> elapsed: 8.018 sec  <<< FAILURE!
> java.lang.AssertionError: assertion failed: timeout (3 seconds) during 
> expectMsgClass waiting for class 
> org.apache.flink.runtime.messages.JobManagerMessages$JobResultSuccess
>   at scala.Predef$.assert(Predef.scala:165)
>   at 
> akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:423)
>   at akka.testkit.TestKitBase$class.expectMsgClass(TestKit.scala:410)
>   at akka.testkit.TestKit.expectMsgClass(TestKit.scala:718)
>   at akka.testkit.JavaTestKit.expectMsgClass(JavaTestKit.java:397)
>   at 
> org.apache.flink.runtime.jobmanager.JobManagerTest$2.(JobManagerTest.java:273)
>   at 
> org.apache.flink.runtime.jobmanager.JobManagerTest.testStopSignal(JobManagerTest.java:237)
> {noformat}
> I propose to replace the default timeout per {{expectMsg}} of 3 seconds with 
> a total test duration of 15 seconds.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3380) Unstable Test: JobSubmissionFailsITCase

2016-04-27 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1526#comment-1526
 ] 

Matthias J. Sax commented on FLINK-3380:


[~uce] Can this be closed? PR1611 was merged long ago.

> Unstable Test: JobSubmissionFailsITCase
> ---
>
> Key: FLINK-3380
> URL: https://issues.apache.org/jira/browse/FLINK-3380
> Project: Flink
>  Issue Type: Bug
>Reporter: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034635/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2971) Add outer joins to the Table API

2016-04-27 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15259992#comment-15259992
 ] 

Dawid Wysakowicz commented on FLINK-2971:
-

Thanks for assigning me to this issue. I've started working on it, but I 
encountered a problem I wanted to discuss. I wanted to enhance {{Table}} with 
methods like:
{code}
  def join(right: Table): Table
  def join(right: Table, onColumns: String): Table
  def join(right: Table, onColumns: Expression): Table 
  def join(right: Table, onColumns: Expression, joinType: JoinType): Table 
{code}
Those method enable users to specify join condition and joinType (join 
condition pushed from {{filter}} or {{where}} disables the outer joins). 
Unfortunately right know it is impossible to resolve 
{{UnresolvedFieldReference}} to a field of one of many inputs. I proposed a 
solution(table.scala:437-447), but I am not sure if it is elegant enough.

My WIP branch: https://github.com/dawidwys/flink/tree/outerJoin

> Add outer joins to the Table API
> 
>
> Key: FLINK-2971
> URL: https://issues.apache.org/jira/browse/FLINK-2971
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>
> Since Flink now supports outer joins, the Table API can also support left, 
> right and full outer joins.
> Given that null values are properly supported by RowSerializer etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3086) ExpressionParser does not support concatenation of suffix operations

2016-04-27 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-3086.
-
   Resolution: Fixed
Fix Version/s: 1.1.0

Fixed in 16059ea.

> ExpressionParser does not support concatenation of suffix operations
> 
>
> Key: FLINK-3086
> URL: https://issues.apache.org/jira/browse/FLINK-3086
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
> Fix For: 1.1.0
>
>
> The ExpressionParser of the Table API does not support concatenation of 
> suffix operations. e.g. 
> {code}table.select("field.cast(STRING).substring(2)"){code} throws  an 
> exception.
> {code}
> org.apache.flink.api.table.ExpressionException: Could not parse expression: 
> string matching regex `\z' expected but `.' found
>   at 
> org.apache.flink.api.table.parser.ExpressionParser$.parseExpressionList(ExpressionParser.scala:224)
> {code}
> However, the Scala implicit Table Expression API supports this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3086) ExpressionParser does not support concatenation of suffix operations

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15259967#comment-15259967
 ] 

ASF GitHub Bot commented on FLINK-3086:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1912


> ExpressionParser does not support concatenation of suffix operations
> 
>
> Key: FLINK-3086
> URL: https://issues.apache.org/jira/browse/FLINK-3086
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> The ExpressionParser of the Table API does not support concatenation of 
> suffix operations. e.g. 
> {code}table.select("field.cast(STRING).substring(2)"){code} throws  an 
> exception.
> {code}
> org.apache.flink.api.table.ExpressionException: Could not parse expression: 
> string matching regex `\z' expected but `.' found
>   at 
> org.apache.flink.api.table.parser.ExpressionParser$.parseExpressionList(ExpressionParser.scala:224)
> {code}
> However, the Scala implicit Table Expression API supports this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3086] [table] ExpressionParser does not...

2016-04-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1912


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3086] [table] ExpressionParser does not...

2016-04-27 Thread twalthr
Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1912#issuecomment-215047867
  
Ok, I will merge this ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3086) ExpressionParser does not support concatenation of suffix operations

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15259961#comment-15259961
 ] 

ASF GitHub Bot commented on FLINK-3086:
---

Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1912#issuecomment-215047867
  
Ok, I will merge this ;-)


> ExpressionParser does not support concatenation of suffix operations
> 
>
> Key: FLINK-3086
> URL: https://issues.apache.org/jira/browse/FLINK-3086
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> The ExpressionParser of the Table API does not support concatenation of 
> suffix operations. e.g. 
> {code}table.select("field.cast(STRING).substring(2)"){code} throws  an 
> exception.
> {code}
> org.apache.flink.api.table.ExpressionException: Could not parse expression: 
> string matching regex `\z' expected but `.' found
>   at 
> org.apache.flink.api.table.parser.ExpressionParser$.parseExpressionList(ExpressionParser.scala:224)
> {code}
> However, the Scala implicit Table Expression API supports this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3771) Methods for translating Graphs

2016-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15259927#comment-15259927
 ] 

ASF GitHub Bot commented on FLINK-3771:
---

Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1900#issuecomment-215040606
  
I think this is a very good idea, and we can wrap the user's MapFunction to 
function like a MapFunction.

Now we can start discussing algorithms :)


> Methods for translating Graphs
> --
>
> Key: FLINK-3771
> URL: https://issues.apache.org/jira/browse/FLINK-3771
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Provide methods for translation of the type or value of graph labels, vertex 
> values, and edge values.
> Sample use cases:
> * shifting graph labels in order to union generated graphs or graphs read 
> from multiple sources
> * downsizing labels or values since algorithms prefer to generate wide types 
> which may be expensive for further computation
> * changing label type for testing or benchmarking alternative code paths



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

2016-04-27 Thread greghogan
Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1900#issuecomment-215040606
  
I think this is a very good idea, and we can wrap the user's MapFunction to 
function like a MapFunction.

Now we can start discussing algorithms :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61233316
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionITCase.java
 ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FileSplitMonitoringFunctionITCase extends 
StreamingProgramTestBase {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
+
+   private Set hdPaths = new HashSet<>();
+   private Set hdPathNames = new HashSet<>();
+   private Map hdPathContents = new HashMap<>();
+
+   //  PREPARING FOR THE TESTS
+
+   @Before
+   public void createHDFS() {
+   try {
+   baseDir = new 
File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+   FileUtil.fullyDelete(baseDir);
+
+   org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+   hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+   hdConf.set("dfs.block.size", String.valueOf(1048576)); 
// this is the minimum we can set.
+
+   MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+   hdfsCluster = builder.build();
+
+   hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
+   hdfs = new 
org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+   } catch(Throwable e) {
+   e.printStackTrace();
+   Assert.fail("Test failed " + e.getMessage());
+   }
+   }
+
+   @After
+   public void destroyHDFS() {
+   FileUtil.fullyDelete(baseDir);
+   hdfsCluster.shutdown();
+   }
+
+   //  END OF PREPARATIONS
+
+   private static final Object lock = new Object();
+
+   private boolean[] finished;
+
+   @Override
+   protected void testProgram() throws Exception {
+   FileCreator fileCreator = new FileCreator(INTERVAL);
+   Thread t = new Thread(fileCreator);
+   t.start();
+   Thread.sleep(100);
+
+   StringFileFormat format = new StringFileFormat();
+   format.setFilePath(hdfsURI);
+
+   Configuration config = new Configuration();
  

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61233284
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionITCase.java
 ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FileSplitMonitoringFunctionITCase extends 
StreamingProgramTestBase {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
+
+   private Set hdPaths = new HashSet<>();
--- End diff --

These fields are not used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61232860
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionTest.java
 ---
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+public class FileSplitMonitoringFunctionTest {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private static File baseDir;
+
+   private static org.apache.hadoop.fs.FileSystem hdfs;
+   private static String hdfsURI;
+   private static MiniDFSCluster hdfsCluster;
+
+   //  PREPARING FOR THE TESTS
+
+   @BeforeClass
+   public static void createHDFS() {
+   try {
+   baseDir = new 
File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+   FileUtil.fullyDelete(baseDir);
+
+   org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+   hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+   hdConf.set("dfs.block.size", String.valueOf(1048576)); 
// this is the minimum we can set.
+
+   MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+   hdfsCluster = builder.build();
+
+   hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
+   hdfs = new 
org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+   } catch(Throwable e) {
+   e.printStackTrace();
+   Assert.fail("Test failed " + e.getMessage());
+   }
+   }
+
+   @AfterClass
+   public static void destroyHDFS() {
+   FileUtil.fullyDelete(baseDir);
+   hdfsCluster.shutdown();
+   }
+
+   //  END OF PREPARATIONS
+
+   //  TESTS
+
+   @Test
+   public void testFileReadingOperator() throws Exception {
+   Set filesCreated = new HashSet<>();
+   Map fileContents = new HashMap<>();
+   for(int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   filesCreated.add(file.f0);
+   fileContents.put(i, file.f1);
+   }
+
+   StringFileFormat format = new StringFileFormat();
+   Configuration config = new Configuration();
+   config.setString("input.file.path", hdfsURI);
+
+  

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61232298
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   private Queue unReadSplits;
+
+   private transient SplitReader reader;
+   private transient Object checkpointLock;
+   private transient TimestampedCollector collector;
+
+   private Configuration configuration;
+   private FileInputFormat format;
+   private TypeInformation typeInfo;
+   private transient TypeSerializer serializer;
+
+   public FileSplitReadOperator(FileInputFormat format, 
TypeInformation typeInfo, Configuration configuration) {
+   this.format = format;
+   this.typeInfo = typeInfo;
+   this.configuration = configuration;
+   this.unReadSplits = new ConcurrentLinkedQueue<>();
+
+   // this is for the extra thread that is reading,
+   // the tests were not terminating because the success
+   // exception was caught by the extra thread, and never
+   // forwarded.
+
+   // for now, and to keep the extra thread to avoid backpressure,
+   // we just disable chaining for the operator, so that it runs on
+   // another thread. This will change later for a cleaner design.
+   setChainingStrategy(ChainingStrategy.NEVER);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   this.format.configure(configuration);
+   this.collector = new TimestampedCollector(output);
+   this.checkpointLock = getContainingTask()
+   .getCheckpointLock();
+   this.serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+
+   this.reader = new SplitReader(unReadSplits, format, serializer, 
collector, checkpointLock);
+   this.reader.start();
+   }
+
+   @Override
+   public void processElement(StreamRecord element) throws 
Exception {
+   LOG.info("Reading Split: " + element.getValue());
+   this.unReadSplits.a

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61231656
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   private Queue unReadSplits;
+
+   private transient SplitReader reader;
+   private transient Object checkpointLock;
+   private transient TimestampedCollector collector;
+
+   private Configuration configuration;
+   private FileInputFormat format;
+   private TypeInformation typeInfo;
+   private transient TypeSerializer serializer;
+
+   public FileSplitReadOperator(FileInputFormat format, 
TypeInformation typeInfo, Configuration configuration) {
+   this.format = format;
+   this.typeInfo = typeInfo;
+   this.configuration = configuration;
+   this.unReadSplits = new ConcurrentLinkedQueue<>();
+
+   // this is for the extra thread that is reading,
+   // the tests were not terminating because the success
+   // exception was caught by the extra thread, and never
+   // forwarded.
+
+   // for now, and to keep the extra thread to avoid backpressure,
+   // we just disable chaining for the operator, so that it runs on
+   // another thread. This will change later for a cleaner design.
+   setChainingStrategy(ChainingStrategy.NEVER);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   this.format.configure(configuration);
+   this.collector = new TimestampedCollector(output);
+   this.checkpointLock = getContainingTask()
+   .getCheckpointLock();
+   this.serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+
+   this.reader = new SplitReader(unReadSplits, format, serializer, 
collector, checkpointLock);
+   this.reader.start();
+   }
+
+   @Override
+   public void processElement(StreamRecord element) throws 
Exception {
+   LOG.info("Reading Split: " + element.getValue());
+   this.unReadSplits.a

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61231507
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   private Queue unReadSplits;
+
+   private transient SplitReader reader;
+   private transient Object checkpointLock;
+   private transient TimestampedCollector collector;
+
+   private Configuration configuration;
+   private FileInputFormat format;
+   private TypeInformation typeInfo;
+   private transient TypeSerializer serializer;
+
+   public FileSplitReadOperator(FileInputFormat format, 
TypeInformation typeInfo, Configuration configuration) {
+   this.format = format;
+   this.typeInfo = typeInfo;
+   this.configuration = configuration;
+   this.unReadSplits = new ConcurrentLinkedQueue<>();
+
+   // this is for the extra thread that is reading,
+   // the tests were not terminating because the success
+   // exception was caught by the extra thread, and never
+   // forwarded.
+
+   // for now, and to keep the extra thread to avoid backpressure,
+   // we just disable chaining for the operator, so that it runs on
+   // another thread. This will change later for a cleaner design.
+   setChainingStrategy(ChainingStrategy.NEVER);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   this.format.configure(configuration);
+   this.collector = new TimestampedCollector(output);
+   this.checkpointLock = getContainingTask()
+   .getCheckpointLock();
+   this.serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+
+   this.reader = new SplitReader(unReadSplits, format, serializer, 
collector, checkpointLock);
+   this.reader.start();
+   }
+
+   @Override
+   public void processElement(StreamRecord element) throws 
Exception {
+   LOG.info("Reading Split: " + element.getValue());
+   this.unReadSplits.a

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61231326
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   private Queue unReadSplits;
--- End diff --

I think the queue can be moved to the `SplitReader`, the split reader can 
then have a method `addSplit`. This would better isolate concerns.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61230608
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   private Queue unReadSplits;
+
+   private transient SplitReader reader;
+   private transient Object checkpointLock;
+   private transient TimestampedCollector collector;
+
+   private Configuration configuration;
+   private FileInputFormat format;
+   private TypeInformation typeInfo;
+   private transient TypeSerializer serializer;
+
+   public FileSplitReadOperator(FileInputFormat format, 
TypeInformation typeInfo, Configuration configuration) {
+   this.format = format;
+   this.typeInfo = typeInfo;
+   this.configuration = configuration;
+   this.unReadSplits = new ConcurrentLinkedQueue<>();
+
+   // this is for the extra thread that is reading,
+   // the tests were not terminating because the success
+   // exception was caught by the extra thread, and never
+   // forwarded.
+
+   // for now, and to keep the extra thread to avoid backpressure,
+   // we just disable chaining for the operator, so that it runs on
+   // another thread. This will change later for a cleaner design.
+   setChainingStrategy(ChainingStrategy.NEVER);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   this.format.configure(configuration);
+   this.collector = new TimestampedCollector(output);
+   this.checkpointLock = getContainingTask()
+   .getCheckpointLock();
+   this.serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+
+   this.reader = new SplitReader(unReadSplits, format, serializer, 
collector, checkpointLock);
+   this.reader.start();
+   }
+
+   @Override
+   public void processElement(StreamRecord element) throws 
Exception {
+   LOG.info("Reading Split: " + element.getValue());
--- End diff --

Should

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61230524
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   private Queue unReadSplits;
+
+   private transient SplitReader reader;
+   private transient Object checkpointLock;
+   private transient TimestampedCollector collector;
+
+   private Configuration configuration;
+   private FileInputFormat format;
+   private TypeInformation typeInfo;
+   private transient TypeSerializer serializer;
+
+   public FileSplitReadOperator(FileInputFormat format, 
TypeInformation typeInfo, Configuration configuration) {
+   this.format = format;
+   this.typeInfo = typeInfo;
+   this.configuration = configuration;
+   this.unReadSplits = new ConcurrentLinkedQueue<>();
+
+   // this is for the extra thread that is reading,
+   // the tests were not terminating because the success
+   // exception was caught by the extra thread, and never
+   // forwarded.
+
+   // for now, and to keep the extra thread to avoid backpressure,
+   // we just disable chaining for the operator, so that it runs on
+   // another thread. This will change later for a cleaner design.
+   setChainingStrategy(ChainingStrategy.NEVER);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   this.format.configure(configuration);
+   this.collector = new TimestampedCollector(output);
--- End diff --

Superfluous `OUT` parameter, can be `<>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61230544
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the splits received from {@link 
FileSplitMonitoringFunction}.
+ * This operator will receive just the split descriptors and then read and 
emit records. This may lead
+ * to backpressure. To avoid this, we will have another thread actually 
reading the splits and
+ * another forwarding the checkpoint barriers. The two should sync so that 
the checkpoints reflect the
+ * current state.
+ * */
+public class FileSplitReadOperator extends 
AbstractStreamOperator implements OneInputStreamOperator {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   private Queue unReadSplits;
+
+   private transient SplitReader reader;
+   private transient Object checkpointLock;
+   private transient TimestampedCollector collector;
+
+   private Configuration configuration;
+   private FileInputFormat format;
+   private TypeInformation typeInfo;
+   private transient TypeSerializer serializer;
+
+   public FileSplitReadOperator(FileInputFormat format, 
TypeInformation typeInfo, Configuration configuration) {
+   this.format = format;
+   this.typeInfo = typeInfo;
+   this.configuration = configuration;
+   this.unReadSplits = new ConcurrentLinkedQueue<>();
+
+   // this is for the extra thread that is reading,
+   // the tests were not terminating because the success
+   // exception was caught by the extra thread, and never
+   // forwarded.
+
+   // for now, and to keep the extra thread to avoid backpressure,
+   // we just disable chaining for the operator, so that it runs on
+   // another thread. This will change later for a cleaner design.
+   setChainingStrategy(ChainingStrategy.NEVER);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   this.format.configure(configuration);
+   this.collector = new TimestampedCollector(output);
+   this.checkpointLock = getContainingTask()
+   .getCheckpointLock();
+   this.serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+
+   this.reader = new SplitReader(unReadSplits, format, serializer, 
collector, checkpointLock);
--- End diff --

Missing generic, can be `<>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the f

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-27 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r61230370
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = false;
+
+   private Configuration configuration;
--- End diff --

We should explain why we have the `Configuration` here. That the 
`Configuration` that we get in `open()` is not valid. Same in the 
`FileSplitReadOperator`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3834) flink-statebackend-rocksdb

2016-04-27 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-3834:


 Summary: flink-statebackend-rocksdb
 Key: FLINK-3834
 URL: https://issues.apache.org/jira/browse/FLINK-3834
 Project: Flink
  Issue Type: Sub-task
Reporter: Till Rohrmann






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3833) flink-test-utils

2016-04-27 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-3833:


 Summary: flink-test-utils
 Key: FLINK-3833
 URL: https://issues.apache.org/jira/browse/FLINK-3833
 Project: Flink
  Issue Type: Sub-task
Reporter: Till Rohrmann






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3832) flink-streaming-scala

2016-04-27 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-3832:


 Summary: flink-streaming-scala
 Key: FLINK-3832
 URL: https://issues.apache.org/jira/browse/FLINK-3832
 Project: Flink
  Issue Type: Sub-task
Reporter: Till Rohrmann






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3831) flink-streaming-java

2016-04-27 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-3831:


 Summary: flink-streaming-java
 Key: FLINK-3831
 URL: https://issues.apache.org/jira/browse/FLINK-3831
 Project: Flink
  Issue Type: Sub-task
Reporter: Till Rohrmann






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3830) flink-scala

2016-04-27 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-3830:


 Summary: flink-scala
 Key: FLINK-3830
 URL: https://issues.apache.org/jira/browse/FLINK-3830
 Project: Flink
  Issue Type: Sub-task
Reporter: Till Rohrmann






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >