[jira] [Created] (FLINK-3841) RocksDB statebackend creates empty dbs for stateless operators
Gyula Fora created FLINK-3841: - Summary: RocksDB statebackend creates empty dbs for stateless operators Key: FLINK-3841 URL: https://issues.apache.org/jira/browse/FLINK-3841 Project: Flink Issue Type: Bug Components: state backends Reporter: Gyula Fora Priority: Minor Even though they are not checkpointed there is always an open RocksDB database for all operators if the Rocks backend is set. I wonder if it would make sense to lazy initialize the dbs instead of doing it in the initializeForJob method. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3840) RocksDB local parent dir is polluted with empty folders with random names
Gyula Fora created FLINK-3840: - Summary: RocksDB local parent dir is polluted with empty folders with random names Key: FLINK-3840 URL: https://issues.apache.org/jira/browse/FLINK-3840 Project: Flink Issue Type: Bug Components: state backends Reporter: Gyula Fora For some reason when the job starts the rocksdb root dir filled with hundreds of empty folders with random names like: 041da1c-5fec-42ed-a69c-298240f1a065 4e6061aa-0c69-4755-a1ad-5ac4dec1d3f0 a7004bd1-778c-4a0f-96d4-9941208d1888 00db8406-6cb4-46ad-aac9-beeaa3247d16 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261609#comment-15261609 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r61378731 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala --- @@ -215,7 +215,7 @@ class ScalarFunctionsTest { } - @Test + @Ignore --- End diff -- `exp` `log` `pow` and `ln` should have `Double` as input. What I was thinking is, we should add some extra `type coercion` rules and add an `cast` when we can do it safely(when an expression is asking a `Double` but we provide a `Int`), for example, `Byte` to `Double`, `Long` to `Double` and so on, and rewrite the expression as `pow(cast(x, Double))` > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r61378731 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala --- @@ -215,7 +215,7 @@ class ScalarFunctionsTest { } - @Test + @Ignore --- End diff -- `exp` `log` `pow` and `ln` should have `Double` as input. What I was thinking is, we should add some extra `type coercion` rules and add an `cast` when we can do it safely(when an expression is asking a `Double` but we provide a `Int`), for example, `Byte` to `Double`, `Long` to `Double` and so on, and rewrite the expression as `pow(cast(x, Double))` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261600#comment-15261600 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r61378085 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala --- @@ -17,13 +17,34 @@ */ package org.apache.flink.api.table.expressions -import java.util.concurrent.atomic.AtomicInteger - import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -abstract class Expression extends TreeNode[Expression] { self: Product => - def name: String = Expression.freshName("expression") +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.trees.TreeNode +import org.apache.flink.api.table.validate.ExprValidationResult + +abstract class Expression extends TreeNode[Expression] { + /** +* Returns the [[TypeInformation]] for evaluating this expression. +* It is sometimes available until the expression is valid. +*/ + def dataType: TypeInformation[_] + + /** +* One pass validation of the expression tree in post order. +*/ + lazy val valid: Boolean = childrenValid && validateInput().isSuccess + + def childrenValid: Boolean = children.forall(_.valid) + + /** +* Check input data types, inputs number or other properties specified by this expression. +* Return `ValidationSuccess` if it pass the check, +* or `ValidationFailure` with supplement message explaining the error. +* Note: we should only call this method until `childrenValidated == true` --- End diff -- Ah, yes! > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r61378085 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala --- @@ -17,13 +17,34 @@ */ package org.apache.flink.api.table.expressions -import java.util.concurrent.atomic.AtomicInteger - import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -abstract class Expression extends TreeNode[Expression] { self: Product => - def name: String = Expression.freshName("expression") +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.trees.TreeNode +import org.apache.flink.api.table.validate.ExprValidationResult + +abstract class Expression extends TreeNode[Expression] { + /** +* Returns the [[TypeInformation]] for evaluating this expression. +* It is sometimes available until the expression is valid. +*/ + def dataType: TypeInformation[_] + + /** +* One pass validation of the expression tree in post order. +*/ + lazy val valid: Boolean = childrenValid && validateInput().isSuccess + + def childrenValid: Boolean = children.forall(_.valid) + + /** +* Check input data types, inputs number or other properties specified by this expression. +* Return `ValidationSuccess` if it pass the check, +* or `ValidationFailure` with supplement message explaining the error. +* Note: we should only call this method until `childrenValidated == true` --- End diff -- Ah, yes! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2946) Add orderBy() to Table API
[ https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1526#comment-1526 ] ASF GitHub Bot commented on FLINK-2946: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1926#issuecomment-215254090 Thanks for the update @dawidwys. PR looks good except for a few minor comments. Can you also rebase to the current master? A recent commit updated some code that your PR is touching (`DataSetScan`). Then it should be good to merge. > Add orderBy() to Table API > -- > > Key: FLINK-2946 > URL: https://issues.apache.org/jira/browse/FLINK-2946 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz > > In order to implement a FLINK-2099 prototype that uses the Table APIs code > generation facilities, the Table API needs a sorting feature. > I would implement it the next days. Ideas how to implement such a sorting > feature are very welcome. Is there any more efficient way instead of > {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the > nodes first and finally sort on one node afterwards? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1926#issuecomment-215254090 Thanks for the update @dawidwys. PR looks good except for a few minor comments. Can you also rebase to the current master? A recent commit updated some code that your PR is touching (`DataSetScan`). Then it should be good to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r61349569 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.scala.{ExecutionEnvironment, _} +import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SortITCase( +mode: TestExecutionMode, +configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + def getExecutionEnvironment = { +val env = ExecutionEnvironment.getExecutionEnvironment +env.setParallelism(4) +env + } + + val tupleDataSetStrings = List((1, 1L, "Hi") +,(2, 2L, "Hello") +,(3, 2L, "Hello world") +,(4, 3L, "Hello world, how are you?") +,(5, 3L, "I am fine.") +,(6, 3L, "Luke Skywalker") +,(7, 4L, "Comment#1") +,(8, 4L, "Comment#2") +,(9, 4L, "Comment#3") +,(10, 4L, "Comment#4") +,(11, 5L, "Comment#5") +,(12, 5L, "Comment#6") +,(13, 5L, "Comment#7") +,(14, 5L, "Comment#8") +,(15, 5L, "Comment#9") +,(16, 6L, "Comment#10") +,(17, 6L, "Comment#11") +,(18, 6L, "Comment#12") +,(19, 6L, "Comment#13") +,(20, 6L, "Comment#14") +,(21, 6L, "Comment#15")) + + @Test + def testOrderByDesc(): Unit = { +val env = getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +val ds = CollectionDataSets.get3TupleDataSet(env) +val t = ds.toTable(tEnv).orderBy('_1.desc) +implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => + - x.productElement(0).asInstanceOf[Int]) + +val expected = sortExpectedly(tupleDataSetStrings) +val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + +val result = results.filterNot(_.isEmpty).sortBy(p => p.min).reduceLeft(_ ++ _) --- End diff -- `p.min` should be `p.head` because p should already be sorted (same applies for the other tests as well). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2946) Add orderBy() to Table API
[ https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261106#comment-15261106 ] ASF GitHub Bot commented on FLINK-2946: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r61349569 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.scala.{ExecutionEnvironment, _} +import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SortITCase( +mode: TestExecutionMode, +configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + def getExecutionEnvironment = { +val env = ExecutionEnvironment.getExecutionEnvironment +env.setParallelism(4) +env + } + + val tupleDataSetStrings = List((1, 1L, "Hi") +,(2, 2L, "Hello") +,(3, 2L, "Hello world") +,(4, 3L, "Hello world, how are you?") +,(5, 3L, "I am fine.") +,(6, 3L, "Luke Skywalker") +,(7, 4L, "Comment#1") +,(8, 4L, "Comment#2") +,(9, 4L, "Comment#3") +,(10, 4L, "Comment#4") +,(11, 5L, "Comment#5") +,(12, 5L, "Comment#6") +,(13, 5L, "Comment#7") +,(14, 5L, "Comment#8") +,(15, 5L, "Comment#9") +,(16, 6L, "Comment#10") +,(17, 6L, "Comment#11") +,(18, 6L, "Comment#12") +,(19, 6L, "Comment#13") +,(20, 6L, "Comment#14") +,(21, 6L, "Comment#15")) + + @Test + def testOrderByDesc(): Unit = { +val env = getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +val ds = CollectionDataSets.get3TupleDataSet(env) +val t = ds.toTable(tEnv).orderBy('_1.desc) +implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => + - x.productElement(0).asInstanceOf[Int]) + +val expected = sortExpectedly(tupleDataSetStrings) +val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + +val result = results.filterNot(_.isEmpty).sortBy(p => p.min).reduceLeft(_ ++ _) --- End diff -- `p.min` should be `p.head` because p should already be sorted (same applies for the other tests as well). > Add orderBy() to Table API > -- > > Key: FLINK-2946 > URL: https://issues.apache.org/jira/browse/FLINK-2946 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz > > In order to implement a FLINK-2099 prototype that uses the Table APIs code > generation facilities, the Table API needs a sorting feature. > I would implement it the next days. Ideas how to implement such a sorting > feature are very welcome. Is there any more efficient way instead of > {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the > nodes first and finally sort on one node afterwards? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r61349399 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.MapRunner + +package object dataset { + + private[dataset] def getConversionMapper(config: TableConfig, + inputType: TypeInformation[Any], + expectedType: TypeInformation[Any], + conversionOperatorName: String, + fieldNames: Seq[String], + inputPojoFieldMapping: Option[Array[Int]] = None) + : MapFunction[Any, Any] = { + +val generator = new CodeGenerator(config, --- End diff -- move first parameter to next line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r61349443 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.logical.{LogicalJoin, LogicalSort} +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSort} + +class DataSetSortRule + extends ConverterRule( +classOf[LogicalSort], +Convention.NONE, +DataSetConvention.INSTANCE, +"FlinkSortRule") { --- End diff -- "FlinkSortRule" -> "DataSetSortRule" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2946) Add orderBy() to Table API
[ https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261103#comment-15261103 ] ASF GitHub Bot commented on FLINK-2946: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r61349399 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.MapRunner + +package object dataset { + + private[dataset] def getConversionMapper(config: TableConfig, + inputType: TypeInformation[Any], + expectedType: TypeInformation[Any], + conversionOperatorName: String, + fieldNames: Seq[String], + inputPojoFieldMapping: Option[Array[Int]] = None) + : MapFunction[Any, Any] = { + +val generator = new CodeGenerator(config, --- End diff -- move first parameter to next line > Add orderBy() to Table API > -- > > Key: FLINK-2946 > URL: https://issues.apache.org/jira/browse/FLINK-2946 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz > > In order to implement a FLINK-2099 prototype that uses the Table APIs code > generation facilities, the Table API needs a sorting feature. > I would implement it the next days. Ideas how to implement such a sorting > feature are very welcome. Is there any more efficient way instead of > {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the > nodes first and finally sort on one node afterwards? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2946) Add orderBy() to Table API
[ https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261105#comment-15261105 ] ASF GitHub Bot commented on FLINK-2946: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r61349443 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.logical.{LogicalJoin, LogicalSort} +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSort} + +class DataSetSortRule + extends ConverterRule( +classOf[LogicalSort], +Convention.NONE, +DataSetConvention.INSTANCE, +"FlinkSortRule") { --- End diff -- "FlinkSortRule" -> "DataSetSortRule" > Add orderBy() to Table API > -- > > Key: FLINK-2946 > URL: https://issues.apache.org/jira/browse/FLINK-2946 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz > > In order to implement a FLINK-2099 prototype that uses the Table APIs code > generation facilities, the Table API needs a sorting feature. > I would implement it the next days. Ideas how to implement such a sorting > feature are very welcome. Is there any more efficient way instead of > {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the > nodes first and finally sort on one node afterwards? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r61349224 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.MapRunner + +package object dataset { + + private[dataset] def getConversionMapper(config: TableConfig, + inputType: TypeInformation[Any], --- End diff -- We follow a different indention style and move also the first parameter to the next line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2946) Add orderBy() to Table API
[ https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261099#comment-15261099 ] ASF GitHub Bot commented on FLINK-2946: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r61349224 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.MapRunner + +package object dataset { + + private[dataset] def getConversionMapper(config: TableConfig, + inputType: TypeInformation[Any], --- End diff -- We follow a different indention style and move also the first parameter to the next line. > Add orderBy() to Table API > -- > > Key: FLINK-2946 > URL: https://issues.apache.org/jira/browse/FLINK-2946 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz > > In order to implement a FLINK-2099 prototype that uses the Table APIs code > generation facilities, the Table API needs a sorting feature. > I would implement it the next days. Ideas how to implement such a sorting > feature are very welcome. Is there any more efficient way instead of > {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the > nodes first and finally sort on one node afterwards? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2946) Add orderBy() to Table API
[ https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261097#comment-15261097 ] ASF GitHub Bot commented on FLINK-2946: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r61348923 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.MapRunner + +package object dataset { + + private[dataset] def getConversionMapper(config: TableConfig, --- End diff -- Can you move this method to `DataSetRel`? > Add orderBy() to Table API > -- > > Key: FLINK-2946 > URL: https://issues.apache.org/jira/browse/FLINK-2946 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz > > In order to implement a FLINK-2099 prototype that uses the Table APIs code > generation facilities, the Table API needs a sorting feature. > I would implement it the next days. Ideas how to implement such a sorting > feature are very welcome. Is there any more efficient way instead of > {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the > nodes first and finally sort on one node afterwards? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2946) Add orderBy() to Table API
[ https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261094#comment-15261094 ] ASF GitHub Bot commented on FLINK-2946: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r61348811 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.typeutils.PojoTypeInfo +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.typeutils.TypeConverter._ + +import scala.collection.JavaConverters._ + +class DataSetSort( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inp: RelNode, +collations: RelCollation, +rowType2: RelDataType) + extends SingleRel(cluster, traitSet, inp) + with DataSetRel{ + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode ={ +new DataSetSort( + cluster, + traitSet, + inputs.get(0), + collations, + rowType2 +) + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]] = None): DataSet[Any] = { + +val config = tableEnv.getConfig + +val inputDS = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + +val currentParallelism = inputDS.getExecutionEnvironment.getParallelism +var partitionedDs = if (currentParallelism == 1) { + inputDS +} else { + inputDS.partitionByRange(fieldCollations.map(_._1): _*) +.withOrders(fieldCollations.map(_._2): _*) +} + +fieldCollations.foreach { fieldCollation => + partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2) +} + +val inputType = partitionedDs.getType +expectedType match { + + case None if config.getEfficientTypeUsage => +partitionedDs + + case _ => +val determinedType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + +// conversion +if (determinedType != inputType) { + + val mapFunc = getConversionMapper(config, +partitionedDs.getType, +determinedType, +"DataSetSortConversion", +getRowType.getFieldNames.asScala + ) + + partitionedDs.map(mapFunc) +} +// no conversion necessary, forward +else { + partitionedDs +} +} + } + + private def directionToOrder(direction: Direction) = { +direction match { + case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING + case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING +} + + } + + private val fieldCollations = collations.getFieldCollations.asScala +.map(c => (c.getFieldIndex, directionToOrder(c.getDirection))) + + private val sortFieldsToString = fieldCollations +.map(col => s"${rowType2.getFieldNames.get(col._1)} ${col._2.
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r61348923 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/package.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.MapRunner + +package object dataset { + + private[dataset] def getConversionMapper(config: TableConfig, --- End diff -- Can you move this method to `DataSetRel`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r61348811 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.typeutils.PojoTypeInfo +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.typeutils.TypeConverter._ + +import scala.collection.JavaConverters._ + +class DataSetSort( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inp: RelNode, +collations: RelCollation, +rowType2: RelDataType) + extends SingleRel(cluster, traitSet, inp) + with DataSetRel{ + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode ={ +new DataSetSort( + cluster, + traitSet, + inputs.get(0), + collations, + rowType2 +) + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]] = None): DataSet[Any] = { + +val config = tableEnv.getConfig + +val inputDS = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + +val currentParallelism = inputDS.getExecutionEnvironment.getParallelism +var partitionedDs = if (currentParallelism == 1) { + inputDS +} else { + inputDS.partitionByRange(fieldCollations.map(_._1): _*) +.withOrders(fieldCollations.map(_._2): _*) +} + +fieldCollations.foreach { fieldCollation => + partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2) +} + +val inputType = partitionedDs.getType +expectedType match { + + case None if config.getEfficientTypeUsage => +partitionedDs + + case _ => +val determinedType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + +// conversion +if (determinedType != inputType) { + + val mapFunc = getConversionMapper(config, +partitionedDs.getType, +determinedType, +"DataSetSortConversion", +getRowType.getFieldNames.asScala + ) + + partitionedDs.map(mapFunc) +} +// no conversion necessary, forward +else { + partitionedDs +} +} + } + + private def directionToOrder(direction: Direction) = { +direction match { + case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING + case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING +} + + } + + private val fieldCollations = collations.getFieldCollations.asScala +.map(c => (c.getFieldIndex, directionToOrder(c.getDirection))) + + private val sortFieldsToString = fieldCollations +.map(col => s"${rowType2.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ") + + override def toString: String = s"Sort(by: $sortFieldsToString)" + + override def explainTerms(pw: RelWriter) : RelWriter = { +super.explainTerms(pw) + .item("by", sortFieldsToString)
[jira] [Commented] (FLINK-2946) Add orderBy() to Table API
[ https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261092#comment-15261092 ] ASF GitHub Bot commented on FLINK-2946: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r61348408 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.expressions +import org.apache.calcite.rex.RexNode +import org.apache.calcite.tools.RelBuilder + +abstract class Ordering extends UnaryExpression { self: Product => +} + +case class Asc(child: Expression) extends Ordering{ --- End diff -- `Ordering{` -> `Ordering {` > Add orderBy() to Table API > -- > > Key: FLINK-2946 > URL: https://issues.apache.org/jira/browse/FLINK-2946 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz > > In order to implement a FLINK-2099 prototype that uses the Table APIs code > generation facilities, the Table API needs a sorting feature. > I would implement it the next days. Ideas how to implement such a sorting > feature are very welcome. Is there any more efficient way instead of > {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the > nodes first and finally sort on one node afterwards? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r61348408 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.expressions +import org.apache.calcite.rex.RexNode +import org.apache.calcite.tools.RelBuilder + +abstract class Ordering extends UnaryExpression { self: Product => +} + +case class Asc(child: Expression) extends Ordering{ --- End diff -- `Ordering{` -> `Ordering {` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1827) Move test classes in test folders and fix scope of test dependencies
[ https://issues.apache.org/jira/browse/FLINK-1827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260793#comment-15260793 ] ASF GitHub Bot commented on FLINK-1827: --- Github user thvasilo commented on the pull request: https://github.com/apache/flink/pull/1915#issuecomment-215201773 Any idea what might be causing the error in FlinkML? I can't pinpoint it to any specific source file. > Move test classes in test folders and fix scope of test dependencies > > > Key: FLINK-1827 > URL: https://issues.apache.org/jira/browse/FLINK-1827 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 0.9 >Reporter: Flavio Pompermaier >Priority: Minor > Labels: test-compile > Original Estimate: 4h > Remaining Estimate: 4h > > Right now it is not possible to avoid compilation of test classes > (-Dmaven.test.skip=true) because some project (e.g. flink-test-utils) > requires test classes in non-test sources (e.g. > scalatest_${scala.binary.version}) > Test classes should be moved to src/main/test (if Java) and src/test/scala > (if scala) and use scope=test for test dependencies -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1827] and small fixes in some tests
Github user thvasilo commented on the pull request: https://github.com/apache/flink/pull/1915#issuecomment-215201773 Any idea what might be causing the error in FlinkML? I can't pinpoint it to any specific source file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260573#comment-15260573 ] ASF GitHub Bot commented on FLINK-3771: --- Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-215167381 The switch to `MapFunction` means that a user can use a `RichMapFunction` with open, close, accumulators, etc. Rather than add six new methods to `Graph` I implemented these with three `GraphAlgorithm` which use the Gellyish `Graph.run(GraphAlgorithm)` > Methods for translating Graphs > -- > > Key: FLINK-3771 > URL: https://issues.apache.org/jira/browse/FLINK-3771 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Provide methods for translation of the type or value of graph labels, vertex > values, and edge values. > Sample use cases: > * shifting graph labels in order to union generated graphs or graphs read > from multiple sources > * downsizing labels or values since algorithms prefer to generate wide types > which may be expensive for further computation > * changing label type for testing or benchmarking alternative code paths -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-215167381 The switch to `MapFunction` means that a user can use a `RichMapFunction` with open, close, accumulators, etc. Rather than add six new methods to `Graph` I implemented these with three `GraphAlgorithm` which use the Gellyish `Graph.run(GraphAlgorithm)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2971) Add outer joins to the Table API
[ https://issues.apache.org/jira/browse/FLINK-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260493#comment-15260493 ] Dawid Wysakowicz commented on FLINK-2971: - I encounted another problem. Regarding null handling. Right now the generated code for FlatJoinFunction fails when one of the input elements is null. I saw some issues about null handling, is somebody working on that right now? Are there any decisions about null handling? > Add outer joins to the Table API > > > Key: FLINK-2971 > URL: https://issues.apache.org/jira/browse/FLINK-2971 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz > > Since Flink now supports outer joins, the Table API can also support left, > right and full outer joins. > Given that null values are properly supported by RowSerializer etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3839) Support wildcards in classpath parameters
Ken Krugler created FLINK-3839: -- Summary: Support wildcards in classpath parameters Key: FLINK-3839 URL: https://issues.apache.org/jira/browse/FLINK-3839 Project: Flink Issue Type: Improvement Reporter: Ken Krugler Priority: Minor Currently you can only specify a single explict jar with the CLI --classpath file:// parameter.Java (since 1.6) has allowed you to use -cp /* as a way of adding every file that ends in .jar in a directory. This would simplify things, e.g. when running on EMR you have to add roughly 120 jars explicitly, but these are all located in just two directories. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3838) CLI parameter parser is munging application params
Ken Krugler created FLINK-3838: -- Summary: CLI parameter parser is munging application params Key: FLINK-3838 URL: https://issues.apache.org/jira/browse/FLINK-3838 Project: Flink Issue Type: Bug Components: Command-line client Affects Versions: 1.0.2 Reporter: Ken Krugler Priority: Minor If parameters for an application use a single '-' (e.g. -maxtasks) then the CLI argument parser will munge these, and the app gets passed either just the parameter name (e.g. 'maxtask') if the start of the parameter doesn't match a Flink parameter, or you get two values, with the first value being the part that matched (e.g. '-m') and the second value being the rest (e.g. 'axtasks'). The parser should ignore everything after the jar path parameter. Note that using -- does seem to work. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260456#comment-15260456 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1916#issuecomment-215142967 Hi @yjshen, thanks for your patience. I also finished a first pass over the PR. I'd like to propose a third alternative, in addition to the custom validation phase (this PR) and generating `SqlNode`s and using Calcite's validator. Both approaches would mean that the validation happens before the logical plan is translated into a `RelNode`. I think it would be good to eagerly check each method call of the Table API. This would make debugging easier, because exceptions would be thrown where the error is caused. Please correct me if I am wrong, but I think we would not lose validation coverage compared to the coverage this PR if we do eager validation? It might also be easier, because we do not need the recursive operator traversal (still the expression traversal though). Maybe we can even directly translate to `RelNode`s after validation, just like we do right now. I think a lot of this PR could be used for eager validation, not sure if it would be easily possible with the `SqlNode` validation approach. What do you think about eagerly validation, @yjshen and @twalthr? While reviewing the PR, I noticed that some classes seem to be partially derived from Spark's code base (e.g., `TreeNode` and `RuleExecutor`). I know there are some common patterns that apply in optimizers, but it is good style to give credit to the original source code. Can you list which classes are derived from Spark code and add a comment to them pointing to the source of the code? Thanks, Fabian > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1916#issuecomment-215142967 Hi @yjshen, thanks for your patience. I also finished a first pass over the PR. I'd like to propose a third alternative, in addition to the custom validation phase (this PR) and generating `SqlNode`s and using Calcite's validator. Both approaches would mean that the validation happens before the logical plan is translated into a `RelNode`. I think it would be good to eagerly check each method call of the Table API. This would make debugging easier, because exceptions would be thrown where the error is caused. Please correct me if I am wrong, but I think we would not lose validation coverage compared to the coverage this PR if we do eager validation? It might also be easier, because we do not need the recursive operator traversal (still the expression traversal though). Maybe we can even directly translate to `RelNode`s after validation, just like we do right now. I think a lot of this PR could be used for eager validation, not sure if it would be easily possible with the `SqlNode` validation approach. What do you think about eagerly validation, @yjshen and @twalthr? While reviewing the PR, I noticed that some classes seem to be partially derived from Spark's code base (e.g., `TreeNode` and `RuleExecutor`). I know there are some common patterns that apply in optimizers, but it is good style to give credit to the original source code. Can you list which classes are derived from Spark code and add a comment to them pointing to the source of the code? Thanks, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260447#comment-15260447 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r61291959 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala --- @@ -215,7 +215,7 @@ class ScalarFunctionsTest { } - @Test + @Ignore --- End diff -- Why did you exclude these tests? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r61291959 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala --- @@ -215,7 +215,7 @@ class ScalarFunctionsTest { } - @Test + @Ignore --- End diff -- Why did you exclude these tests? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260442#comment-15260442 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r61291865 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala --- @@ -17,13 +17,34 @@ */ package org.apache.flink.api.table.expressions -import java.util.concurrent.atomic.AtomicInteger - import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -abstract class Expression extends TreeNode[Expression] { self: Product => - def name: String = Expression.freshName("expression") +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.trees.TreeNode +import org.apache.flink.api.table.validate.ExprValidationResult + +abstract class Expression extends TreeNode[Expression] { + /** +* Returns the [[TypeInformation]] for evaluating this expression. +* It is sometimes available until the expression is valid. +*/ + def dataType: TypeInformation[_] + + /** +* One pass validation of the expression tree in post order. +*/ + lazy val valid: Boolean = childrenValid && validateInput().isSuccess + + def childrenValid: Boolean = children.forall(_.valid) + + /** +* Check input data types, inputs number or other properties specified by this expression. +* Return `ValidationSuccess` if it pass the check, +* or `ValidationFailure` with supplement message explaining the error. +* Note: we should only call this method until `childrenValidated == true` --- End diff -- `childrenValidated` -> `childrenValid`? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r61291865 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala --- @@ -17,13 +17,34 @@ */ package org.apache.flink.api.table.expressions -import java.util.concurrent.atomic.AtomicInteger - import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -abstract class Expression extends TreeNode[Expression] { self: Product => - def name: String = Expression.freshName("expression") +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.trees.TreeNode +import org.apache.flink.api.table.validate.ExprValidationResult + +abstract class Expression extends TreeNode[Expression] { + /** +* Returns the [[TypeInformation]] for evaluating this expression. +* It is sometimes available until the expression is valid. +*/ + def dataType: TypeInformation[_] + + /** +* One pass validation of the expression tree in post order. +*/ + lazy val valid: Boolean = childrenValid && validateInput().isSuccess + + def childrenValid: Boolean = children.forall(_.valid) + + /** +* Check input data types, inputs number or other properties specified by this expression. +* Return `ValidationSuccess` if it pass the check, +* or `ValidationFailure` with supplement message explaining the error. +* Note: we should only call this method until `childrenValidated == true` --- End diff -- `childrenValidated` -> `childrenValid`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260439#comment-15260439 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r61291698 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala --- @@ -17,13 +17,34 @@ */ package org.apache.flink.api.table.expressions -import java.util.concurrent.atomic.AtomicInteger - import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -abstract class Expression extends TreeNode[Expression] { self: Product => - def name: String = Expression.freshName("expression") +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.trees.TreeNode +import org.apache.flink.api.table.validate.ExprValidationResult + +abstract class Expression extends TreeNode[Expression] { + /** +* Returns the [[TypeInformation]] for evaluating this expression. +* It is sometimes available until the expression is valid. --- End diff -- +not ? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r61291698 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala --- @@ -17,13 +17,34 @@ */ package org.apache.flink.api.table.expressions -import java.util.concurrent.atomic.AtomicInteger - import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -abstract class Expression extends TreeNode[Expression] { self: Product => - def name: String = Expression.freshName("expression") +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.trees.TreeNode +import org.apache.flink.api.table.validate.ExprValidationResult + +abstract class Expression extends TreeNode[Expression] { + /** +* Returns the [[TypeInformation]] for evaluating this expression. +* It is sometimes available until the expression is valid. --- End diff -- +not ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r61291621 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala --- @@ -355,4 +380,26 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + /** +* The primary workflow for executing plan validation for that generated from Table API. +* The validation is intentionally designed as a lazy procedure and triggered when we +* are going to run on Flink core. +*/ + class PlanPreparation(val env: TableEnvironment, val logical: LogicalNode) { + +lazy val resolvedPlan: LogicalNode = env.getValidator.resolve(logical) + +def validate(): Unit = env.getValidator.validate(resolvedPlan) + +lazy val relNode: RelNode = { + env match { +case _: BatchTableEnvironment => --- End diff -- Why do you distinguish here? It's the same code, no? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260438#comment-15260438 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r61291621 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala --- @@ -355,4 +380,26 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + /** +* The primary workflow for executing plan validation for that generated from Table API. +* The validation is intentionally designed as a lazy procedure and triggered when we +* are going to run on Flink core. +*/ + class PlanPreparation(val env: TableEnvironment, val logical: LogicalNode) { + +lazy val resolvedPlan: LogicalNode = env.getValidator.resolve(logical) + +def validate(): Unit = env.getValidator.validate(resolvedPlan) + +lazy val relNode: RelNode = { + env match { +case _: BatchTableEnvironment => --- End diff -- Why do you distinguish here? It's the same code, no? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3669) WindowOperator registers a lot of timers at StreamTask
[ https://issues.apache.org/jira/browse/FLINK-3669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260431#comment-15260431 ] Konstantin Knauf commented on FLINK-3669: - I made the changes as suggested, and will open a PR after the travis build later this evening. Unfortunately, I will not be able to test it with the original application (customer code). I could try to reproduce the issue locally and test it with the new version, but this will take at least till next weekend as I can only do that on my own time. > WindowOperator registers a lot of timers at StreamTask > -- > > Key: FLINK-3669 > URL: https://issues.apache.org/jira/browse/FLINK-3669 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.1 >Reporter: Aljoscha Krettek >Assignee: Konstantin Knauf >Priority: Blocker > > Right now, the WindowOperator registers a timer at the StreamTask for every > processing-time timer that a Trigger registers. We should combine several > registered trigger timers to only register one low-level timer (timer > coalescing). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3836) Change Histogram to enable Long counters
[ https://issues.apache.org/jira/browse/FLINK-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260391#comment-15260391 ] Fabian Hueske commented on FLINK-3836: -- Sure, done. I also gave you contributor permissions for Flink's JIRA. You can now assign issues to yourself. Cheers, Fabian > Change Histogram to enable Long counters > > > Key: FLINK-3836 > URL: https://issues.apache.org/jira/browse/FLINK-3836 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.2 >Reporter: Maximilian Bode >Assignee: Maximilian Bode >Priority: Minor > > Change > flink/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java > to enable Long counts instead of Integer. In particular, change the TreeMap > to be . -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3836) Change Histogram to enable Long counters
[ https://issues.apache.org/jira/browse/FLINK-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-3836: - Assignee: Maximilian Bode > Change Histogram to enable Long counters > > > Key: FLINK-3836 > URL: https://issues.apache.org/jira/browse/FLINK-3836 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.2 >Reporter: Maximilian Bode >Assignee: Maximilian Bode >Priority: Minor > > Change > flink/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java > to enable Long counts instead of Integer. In particular, change the TreeMap > to be . -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3836) Change Histogram to enable Long counters
[ https://issues.apache.org/jira/browse/FLINK-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260382#comment-15260382 ] Maximilian Bode commented on FLINK-3836: I would like to take care of this, can someone assign it to me? > Change Histogram to enable Long counters > > > Key: FLINK-3836 > URL: https://issues.apache.org/jira/browse/FLINK-3836 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.2 >Reporter: Maximilian Bode >Priority: Minor > > Change > flink/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java > to enable Long counts instead of Integer. In particular, change the TreeMap > to be . -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2971) Add outer joins to the Table API
[ https://issues.apache.org/jira/browse/FLINK-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260305#comment-15260305 ] Dawid Wysakowicz commented on FLINK-2971: - Yes, you're right. 9 methods. > Add outer joins to the Table API > > > Key: FLINK-2971 > URL: https://issues.apache.org/jira/browse/FLINK-2971 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz > > Since Flink now supports outer joins, the Table API can also support left, > right and full outer joins. > Given that null values are properly supported by RowSerializer etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2971) Add outer joins to the Table API
[ https://issues.apache.org/jira/browse/FLINK-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260304#comment-15260304 ] Fabian Hueske commented on FLINK-2971: -- Should be 9 methods because the outer joins require the predicates, right? > Add outer joins to the Table API > > > Key: FLINK-2971 > URL: https://issues.apache.org/jira/browse/FLINK-2971 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz > > Since Flink now supports outer joins, the Table API can also support left, > right and full outer joins. > Given that null values are properly supported by RowSerializer etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2971) Add outer joins to the Table API
[ https://issues.apache.org/jira/browse/FLINK-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260294#comment-15260294 ] Dawid Wysakowicz commented on FLINK-2971: - I started with that idea, just one small disadvantage I saw was it will result in 4(3* outer + inner)*3(no joinPredicate, as Expression, as String) = 12 methods. But it would probably be more user friendly. So if you're ok with it, I will change it. > Add outer joins to the Table API > > > Key: FLINK-2971 > URL: https://issues.apache.org/jira/browse/FLINK-2971 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz > > Since Flink now supports outer joins, the Table API can also support left, > right and full outer joins. > Given that null values are properly supported by RowSerializer etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r61273003 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This is the operator that reads the splits received from {@link FileSplitMonitoringFunction}. + * This operator will receive just the split descriptors and then read and emit records. This may lead + * to backpressure. To avoid this, we will have another thread actually reading the splits and + * another forwarding the checkpoint barriers. The two should sync so that the checkpoints reflect the + * current state. + * */ +public class FileSplitReadOperator extends AbstractStreamOperator implements OneInputStreamOperator { + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + private Queue unReadSplits; + + private transient SplitReader reader; + private transient Object checkpointLock; + private transient TimestampedCollector collector; + + private Configuration configuration; + private FileInputFormat format; + private TypeInformation typeInfo; + private transient TypeSerializer serializer; + + public FileSplitReadOperator(FileInputFormat format, TypeInformation typeInfo, Configuration configuration) { + this.format = format; + this.typeInfo = typeInfo; + this.configuration = configuration; + this.unReadSplits = new ConcurrentLinkedQueue<>(); + + // this is for the extra thread that is reading, + // the tests were not terminating because the success + // exception was caught by the extra thread, and never + // forwarded. + + // for now, and to keep the extra thread to avoid backpressure, + // we just disable chaining for the operator, so that it runs on + // another thread. This will change later for a cleaner design. + setChainingStrategy(ChainingStrategy.NEVER); + } + + @Override + public void open() throws Exception { + super.open(); + + this.format.configure(configuration); + this.collector = new TimestampedCollector(output); + this.checkpointLock = getContainingTask() + .getCheckpointLock(); + this.serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); + + this.reader = new SplitReader(unReadSplits, format, serializer, collector, checkpointLock); + this.reader.start(); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + LOG.info("Reading Split: " + element.getValue()); + this.unReadSplits.add(e
[jira] [Created] (FLINK-3837) Create FLOOR/CEIL function
Timo Walther created FLINK-3837: --- Summary: Create FLOOR/CEIL function Key: FLINK-3837 URL: https://issues.apache.org/jira/browse/FLINK-3837 Project: Flink Issue Type: Sub-task Components: Table API Reporter: Timo Walther Assignee: Timo Walther Priority: Minor Create the FLOOR/CEIL function for Table API and SQL. They will later be extended in FLINK-3580 to support date and time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2971) Add outer joins to the Table API
[ https://issues.apache.org/jira/browse/FLINK-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260280#comment-15260280 ] Vasia Kalavri commented on FLINK-2971: -- +1 I was thinking the same! > Add outer joins to the Table API > > > Key: FLINK-2971 > URL: https://issues.apache.org/jira/browse/FLINK-2971 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz > > Since Flink now supports outer joins, the Table API can also support left, > right and full outer joins. > Given that null values are properly supported by RowSerializer etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2971) Add outer joins to the Table API
[ https://issues.apache.org/jira/browse/FLINK-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260270#comment-15260270 ] Fabian Hueske commented on FLINK-2971: -- I think it would be more intuitive to offer explicit outer join methods ({{joinLeftOuter(right, joinPredicate)}}, {{joinRightOuter(right, joinPredicate)}}, and {{joinFullOuter(right, joinPredicate)}},) instead of overloading {{join}} with a join hint parameter. What do you think? > Add outer joins to the Table API > > > Key: FLINK-2971 > URL: https://issues.apache.org/jira/browse/FLINK-2971 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz > > Since Flink now supports outer joins, the Table API can also support left, > right and full outer joins. > Given that null values are properly supported by RowSerializer etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [Flink-3691] extend avroinputformat to support...
Github user gna-phetsarath commented on a diff in the pull request: https://github.com/apache/flink/pull/1920#discussion_r61269058 --- Diff: flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java --- @@ -289,6 +290,119 @@ public void testDeserializeToSpecificType() throws IOException { } } + /** +* Test if the AvroInputFormat is able to properly read data from an Avro +* file as a GenericRecord. +* +* @throws IOException, +* if there is an exception +*/ + @SuppressWarnings("unchecked") + @Test + public void testDeserialisationGenericRecord() throws IOException { + Configuration parameters = new Configuration(); + + AvroInputFormat format = new AvroInputFormat(new Path(testFile.getAbsolutePath()), + GenericRecord.class); + try { + format.configure(parameters); + FileInputSplit[] splits = format.createInputSplits(1); + assertEquals(splits.length, 1); + format.open(splits[0]); + + GenericRecord u = format.nextRecord(null); --- End diff -- ```AvroInputFormat``` does not have a ```nextRecord()``` method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3836) Change Histogram to enable Long counters
[ https://issues.apache.org/jira/browse/FLINK-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260259#comment-15260259 ] Maximilian Bode commented on FLINK-3836: Good point, I guess this would be better. > Change Histogram to enable Long counters > > > Key: FLINK-3836 > URL: https://issues.apache.org/jira/browse/FLINK-3836 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.2 >Reporter: Maximilian Bode >Priority: Minor > > Change > flink/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java > to enable Long counts instead of Integer. In particular, change the TreeMap > to be . -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3836) Change Histogram to enable Long counters
[ https://issues.apache.org/jira/browse/FLINK-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260255#comment-15260255 ] Greg Hogan commented on FLINK-3836: --- {{Histogram}} is marked as part of the {{@Public}} API which can only be broken on new major releases. Could something like an equivalent {{LongHistogram}} be added instead? > Change Histogram to enable Long counters > > > Key: FLINK-3836 > URL: https://issues.apache.org/jira/browse/FLINK-3836 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.2 >Reporter: Maximilian Bode >Priority: Minor > > Change > flink/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java > to enable Long counts instead of Integer. In particular, change the TreeMap > to be . -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3444) env.fromElements relies on the first input element for determining the DataSet/DataStream type
[ https://issues.apache.org/jira/browse/FLINK-3444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260250#comment-15260250 ] Chesnay Schepler commented on FLINK-3444: - It's the easiest and safest solution that gets the job done. At the time of the PR i already thought about your suggestion, but came to the conclusion that there probably can be cases where we would find the "wrong" super class. Then we would need the newly introduced method anyway. > env.fromElements relies on the first input element for determining the > DataSet/DataStream type > -- > > Key: FLINK-3444 > URL: https://issues.apache.org/jira/browse/FLINK-3444 > Project: Flink > Issue Type: Bug > Components: DataSet API, DataStream API >Affects Versions: 0.10.0, 1.0.0 >Reporter: Vasia Kalavri > Fix For: 1.1.0 > > > The {{fromElements}} method of the {{ExecutionEnvironment}} and > {{StreamExecutionEnvironment}} determines the DataSet/DataStream type by > extracting the type of the first input element. > This is problematic if the first element is a subtype of another element in > the collection. > For example, the following > {code} > DataStream input = env.fromElements(new Event(1, "a"), new SubEvent(2, > "b")); > {code} > succeeds, while the following > {code} > DataStream input = env.fromElements(new SubEvent(1, "a"), new Event(2, > "b")); > {code} > fails with "java.lang.IllegalArgumentException: The elements in the > collection are not all subclasses of SubEvent". -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3836) Change Histogram to enable Long counters
Maximilian Bode created FLINK-3836: -- Summary: Change Histogram to enable Long counters Key: FLINK-3836 URL: https://issues.apache.org/jira/browse/FLINK-3836 Project: Flink Issue Type: Improvement Components: Core Affects Versions: 1.0.2 Reporter: Maximilian Bode Priority: Minor Change flink/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java to enable Long counts instead of Integer. In particular, change the TreeMap to be . -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2828) Add interfaces for Table API input formats
[ https://issues.apache.org/jira/browse/FLINK-2828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-2828. Resolution: Implemented Fix Version/s: 1.1.0 Implemented for 1.1.0 with 4f5dbc2edcd3e3a403f2ecfe0cc0bdd95b26b177 > Add interfaces for Table API input formats > -- > > Key: FLINK-2828 > URL: https://issues.apache.org/jira/browse/FLINK-2828 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Fabian Hueske > Fix For: 1.1.0 > > > In order to support input formats for the Table API, interfaces are > necessary. I propose two types of TableSources: > - AdaptiveTableSources can adapt their output to the requirements of the > plan. Although the output schema stays the same, the TableSource can react on > field resolution and/or predicates internally and can return adapted > DataSet/DataStream versions in the "translate" step. > - StaticTableSources are an easy way to provide the Table API with additional > input formats without much implementation effort (e.g. for fromCsvFile()) > TableSources need to be deeply integrated into the Table API. > The TableEnvironment requires a newly introduced AbstractExecutionEnvironment > (common super class of all ExecutionEnvironments for DataSets and > DataStreams). > Here's what a TableSource can see from more complicated queries: > {code} > getTableJava(tableSource1) > .filter("a===5 || a===6") > .select("a as a4, b as b4, c as c4") > .filter("b4===7") > .join(getTableJava(tableSource2)) > .where("a===a4 && c==='Test' && c4==='Test2'") > // Result predicates for tableSource1: > // List("a===5 || a===6", "b===7", "c==='Test2'") > // Result predicates for tableSource2: > // List("c==='Test'") > // Result resolved fields for tableSource1 (true = filtering, > false=selection): > // Set(("a", true), ("a", false), ("b", true), ("b", false), ("c", false), > ("c", true)) > // Result resolved fields for tableSource2 (true = filtering, > false=selection): > // Set(("a", true), ("c", true)) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-3835) JSON execution plan not helpful to debug plans with KeySelectors
[ https://issues.apache.org/jira/browse/FLINK-3835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-3835. Resolution: Fixed Fixed for 1.0.3 with 4a34f6f7342c3f40991a4d9e56f17811bad15d62 Fixed for 1.1.0 with 80b09eaefe8f000f79f017ba822a631b8c29b5be > JSON execution plan not helpful to debug plans with KeySelectors > > > Key: FLINK-3835 > URL: https://issues.apache.org/jira/browse/FLINK-3835 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 1.1.0, 1.0.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > Fix For: 1.1.0, 1.0.3 > > > The JSON execution plans are not helpful when debugging plans that include > join operators with key selectors. For joins with hash join strategy, the > driver strategy shows: {{"Hybrid Hash (build: Key Extractor)"}} where > {{(build: Key Extractor)}} shall help to identify the build side of the join. > However, if both inputs use KeySelectors, the build side cannot be identified. > I propose to add the operator id to the build side information. The same > issue applied for cross driver strategies. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2828] [tableAPI] Add TableSource interf...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1939 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2828) Add interfaces for Table API input formats
[ https://issues.apache.org/jira/browse/FLINK-2828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260225#comment-15260225 ] ASF GitHub Bot commented on FLINK-2828: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1939 > Add interfaces for Table API input formats > -- > > Key: FLINK-2828 > URL: https://issues.apache.org/jira/browse/FLINK-2828 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Fabian Hueske > > In order to support input formats for the Table API, interfaces are > necessary. I propose two types of TableSources: > - AdaptiveTableSources can adapt their output to the requirements of the > plan. Although the output schema stays the same, the TableSource can react on > field resolution and/or predicates internally and can return adapted > DataSet/DataStream versions in the "translate" step. > - StaticTableSources are an easy way to provide the Table API with additional > input formats without much implementation effort (e.g. for fromCsvFile()) > TableSources need to be deeply integrated into the Table API. > The TableEnvironment requires a newly introduced AbstractExecutionEnvironment > (common super class of all ExecutionEnvironments for DataSets and > DataStreams). > Here's what a TableSource can see from more complicated queries: > {code} > getTableJava(tableSource1) > .filter("a===5 || a===6") > .select("a as a4, b as b4, c as c4") > .filter("b4===7") > .join(getTableJava(tableSource2)) > .where("a===a4 && c==='Test' && c4==='Test2'") > // Result predicates for tableSource1: > // List("a===5 || a===6", "b===7", "c==='Test2'") > // Result predicates for tableSource2: > // List("c==='Test'") > // Result resolved fields for tableSource1 (true = filtering, > false=selection): > // Set(("a", true), ("a", false), ("b", true), ("b", false), ("c", false), > ("c", true)) > // Result resolved fields for tableSource2 (true = filtering, > false=selection): > // Set(("a", true), ("c", true)) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2828) Add interfaces for Table API input formats
[ https://issues.apache.org/jira/browse/FLINK-2828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260223#comment-15260223 ] ASF GitHub Bot commented on FLINK-2828: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1939#issuecomment-215096897 Merging > Add interfaces for Table API input formats > -- > > Key: FLINK-2828 > URL: https://issues.apache.org/jira/browse/FLINK-2828 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Fabian Hueske > > In order to support input formats for the Table API, interfaces are > necessary. I propose two types of TableSources: > - AdaptiveTableSources can adapt their output to the requirements of the > plan. Although the output schema stays the same, the TableSource can react on > field resolution and/or predicates internally and can return adapted > DataSet/DataStream versions in the "translate" step. > - StaticTableSources are an easy way to provide the Table API with additional > input formats without much implementation effort (e.g. for fromCsvFile()) > TableSources need to be deeply integrated into the Table API. > The TableEnvironment requires a newly introduced AbstractExecutionEnvironment > (common super class of all ExecutionEnvironments for DataSets and > DataStreams). > Here's what a TableSource can see from more complicated queries: > {code} > getTableJava(tableSource1) > .filter("a===5 || a===6") > .select("a as a4, b as b4, c as c4") > .filter("b4===7") > .join(getTableJava(tableSource2)) > .where("a===a4 && c==='Test' && c4==='Test2'") > // Result predicates for tableSource1: > // List("a===5 || a===6", "b===7", "c==='Test2'") > // Result predicates for tableSource2: > // List("c==='Test'") > // Result resolved fields for tableSource1 (true = filtering, > false=selection): > // Set(("a", true), ("a", false), ("b", true), ("b", false), ("c", false), > ("c", true)) > // Result resolved fields for tableSource2 (true = filtering, > false=selection): > // Set(("a", true), ("c", true)) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2828] [tableAPI] Add TableSource interf...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1939#issuecomment-215096897 Merging --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-3691] extend avroinputformat to support...
Github user gna-phetsarath commented on the pull request: https://github.com/apache/flink/pull/1920#issuecomment-215092007 ReflectDatumReader does not work with GenericRecord because it is an interface, so you need to use GenericDataReader. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-3691] extend avroinputformat to support...
Github user gna-phetsarath commented on a diff in the pull request: https://github.com/apache/flink/pull/1920#discussion_r61261060 --- Diff: flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java --- @@ -119,12 +144,18 @@ public E nextRecord(E reuseValue) throws IOException { if (reachedEnd()) { return null; } - - if (!reuseAvroValue) { - reuseValue = InstantiationUtil.instantiate(avroValueType, Object.class); + if (isGenericRecord) { --- End diff -- OK. I'll make the change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3669) WindowOperator registers a lot of timers at StreamTask
[ https://issues.apache.org/jira/browse/FLINK-3669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260171#comment-15260171 ] Aljoscha Krettek commented on FLINK-3669: - I think we have a winner now :-) Could you also test whether it solves your problem when running your job? It's also cool that you added a test for this. I changed it a bit to go through the complete checkpointing and operator lifecycle: {code} @Test public void testRestoreAndSnapshotAreInSync() throws Exception { final int WINDOW_SIZE = 3; final int WINDOW_SLIDE = 1; TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); ReducingStateDescriptor> stateDesc = new ReducingStateDescriptor<>("window-contents", new SumReducer(), inputType.createSerializer(new ExecutionConfig())); WindowOperator, Tuple2, Tuple2, TimeWindow> operator = new WindowOperator<>( SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction>()), EventTimeTrigger.create()); OneInputStreamOperatorTestHarness, Tuple2> testHarness = new OneInputStreamOperatorTestHarness<>(operator); testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); operator.setInputType(inputType, new ExecutionConfig()); testHarness.open(); WindowOperator.Timer timer1 = new WindowOperator.Timer<>(1L, "key1", new TimeWindow(1L, 2L)); WindowOperator.Timer timer2 = new WindowOperator.Timer<>(3L, "key1", new TimeWindow(1L, 2L)); WindowOperator.Timer timer3 = new WindowOperator.Timer<>(2L, "key1", new TimeWindow(1L, 2L)); operator.processingTimeTimers.add(timer1); operator.processingTimeTimers.add(timer2); operator.processingTimeTimers.add(timer3); operator.processingTimeTimersQueue.add(timer1); operator.processingTimeTimersQueue.add(timer2); operator.processingTimeTimersQueue.add(timer3); operator.processingTimeTimerTimestamps.add(1L, 10); operator.processingTimeTimerTimestamps.add(2L, 5); operator.processingTimeTimerTimestamps.add(3L, 1); StreamTaskState snapshot = testHarness.snapshot(0, 0); WindowOperator, Tuple2, Tuple2, TimeWindow> otherOperator = new WindowOperator<>( SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), stateDesc, new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction>()), EventTimeTrigger.create()); OneInputStreamOperatorTestHarness, Tuple2> otherTestHarness = new OneInputStreamOperatorTestHarness<>(otherOperator); otherTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); otherOperator.setInputType(inputType, new ExecutionConfig()); otherTestHarness.setup(); otherTestHarness.restore(snapshot, 0); otherTestHarness.open(); Assert.assertEquals(operator.processingTimeTimers, otherOperator.processingTimeTimers); Assert.assertArrayEquals(operator.processingTimeTimersQueue.toArray(), otherOperator.processingTimeTimersQueue.toArray()); Assert.assertEquals(operator.processingTimeTimerTimestamps, otherOperator.processingTimeTimerTimestamps); } {code} The test was good already but this is just something I would know since I wrote the test harnesses and the other operator tests. :-) This way, we also don't need the special {{restoreStateFrom}} and {{snapshotStateTo}} methods. Will you open a PR? > WindowOperator registers a lot of timers at StreamTask > -- > > Key: FLINK-3669 >
[jira] [Assigned] (FLINK-3834) flink-statebackend-rocksdb
[ https://issues.apache.org/jira/browse/FLINK-3834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-3834: --- Assignee: Chesnay Schepler > flink-statebackend-rocksdb > -- > > Key: FLINK-3834 > URL: https://issues.apache.org/jira/browse/FLINK-3834 > Project: Flink > Issue Type: Sub-task >Reporter: Till Rohrmann >Assignee: Chesnay Schepler > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1284) Uniform random sampling operator over windows
[ https://issues.apache.org/jira/browse/FLINK-1284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260129#comment-15260129 ] Austin Ouyang commented on FLINK-1284: -- Hi [~till.rohrmann], Thanks! Really excited to get started on this. > Uniform random sampling operator over windows > - > > Key: FLINK-1284 > URL: https://issues.apache.org/jira/browse/FLINK-1284 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Paris Carbone >Assignee: Austin Ouyang >Priority: Minor > > It would be useful for several use cases to have a built-in uniform random > sampling operator in the streaming API that can operate on windows. This can > be used for example for online machine learning operations, evaluating > heuristics or continuous visualisation of representative values. > The operator could be given a field and a number of random samples needed, > following a window statement as such: > mystream.window(..).sample(fieldID,#samples) > Given that pre-aggregation is enabled, this could perhaps be implemented as a > binary reduce operator or a combinable groupreduce that pre-aggregates the > empiricals of that field. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2971) Add outer joins to the Table API
[ https://issues.apache.org/jira/browse/FLINK-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260112#comment-15260112 ] Fabian Hueske commented on FLINK-2971: -- Hi Dawid, I think the approach is good. A few comments on the WIP branch: - I'd call the {{onColumns}} parameter rather {{joinPredicate}} to indicate that a predicate (boolean expression) is expected. - Does Calcite accept non join predicates or do we have to check for that? - Is the partial function actually a partial function if the last case catches everything? - I think we can rename {{UnresolvedFieldReference}} to {{FieldReference}} and remove {{ResolvedFieldReference}} which is not used anymore. Hence {{MultiInputUnresolvedFieldReference}} can also be shorter. Thanks, for working on this! > Add outer joins to the Table API > > > Key: FLINK-2971 > URL: https://issues.apache.org/jira/browse/FLINK-2971 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz > > Since Flink now supports outer joins, the Table API can also support left, > right and full outer joins. > Given that null values are properly supported by RowSerializer etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3444) env.fromElements relies on the first input element for determining the DataSet/DataStream type
[ https://issues.apache.org/jira/browse/FLINK-3444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260115#comment-15260115 ] Till Rohrmann commented on FLINK-3444: -- I'm wondering why the above described problem was solved by introducing an overloaded method {{fromElements}} which takes as the first parameter the {{Class}} of the elements? The problem is that if you do something like {{env.fromElements(SubClass.class, new ParentClass())}} then the vararg only variant of {{fromElements}} will be called with type {{Object}} as result (since Object is the super type of {{Class}} and {{ParentClass}}. The reason is that {{ParentClass}} is not a subtype of {{SubClass}} and thus the new {{fromElements}} method is not applicable. Wouldn't it have been better to automatically infer the super type of all given elements? Usually the {{fromElements}} method is only called for very few elements and, thus, it should be feasible to iterate over all elements to extract the common super type. > env.fromElements relies on the first input element for determining the > DataSet/DataStream type > -- > > Key: FLINK-3444 > URL: https://issues.apache.org/jira/browse/FLINK-3444 > Project: Flink > Issue Type: Bug > Components: DataSet API, DataStream API >Affects Versions: 0.10.0, 1.0.0 >Reporter: Vasia Kalavri > Fix For: 1.1.0 > > > The {{fromElements}} method of the {{ExecutionEnvironment}} and > {{StreamExecutionEnvironment}} determines the DataSet/DataStream type by > extracting the type of the first input element. > This is problematic if the first element is a subtype of another element in > the collection. > For example, the following > {code} > DataStream input = env.fromElements(new Event(1, "a"), new SubEvent(2, > "b")); > {code} > succeeds, while the following > {code} > DataStream input = env.fromElements(new SubEvent(1, "a"), new Event(2, > "b")); > {code} > fails with "java.lang.IllegalArgumentException: The elements in the > collection are not all subclasses of SubEvent". -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3601) JobManagerTest times out on StopSignal test
[ https://issues.apache.org/jira/browse/FLINK-3601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels resolved FLINK-3601. --- Resolution: Fixed Fix Version/s: 1.1.0 Thanks for the reminder! Had a fix already. Increased timeout with 130a22d6c8839018068a8a000b4a2ed6a1ab3d49. > JobManagerTest times out on StopSignal test > --- > > Key: FLINK-3601 > URL: https://issues.apache.org/jira/browse/FLINK-3601 > Project: Flink > Issue Type: Bug > Components: JobManager, Tests >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Minor > Labels: test-stability > Fix For: 1.1.0 > > > {noformat} > testStopSignal(org.apache.flink.runtime.jobmanager.JobManagerTest) Time > elapsed: 8.018 sec <<< FAILURE! > java.lang.AssertionError: assertion failed: timeout (3 seconds) during > expectMsgClass waiting for class > org.apache.flink.runtime.messages.JobManagerMessages$JobResultSuccess > at scala.Predef$.assert(Predef.scala:165) > at > akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:423) > at akka.testkit.TestKitBase$class.expectMsgClass(TestKit.scala:410) > at akka.testkit.TestKit.expectMsgClass(TestKit.scala:718) > at akka.testkit.JavaTestKit.expectMsgClass(JavaTestKit.java:397) > at > org.apache.flink.runtime.jobmanager.JobManagerTest$2.(JobManagerTest.java:273) > at > org.apache.flink.runtime.jobmanager.JobManagerTest.testStopSignal(JobManagerTest.java:237) > {noformat} > I propose to replace the default timeout per {{expectMsg}} of 3 seconds with > a total test duration of 15 seconds. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-3824) ResourceManager may repeatedly connect to outdated JobManager in HA mode
[ https://issues.apache.org/jira/browse/FLINK-3824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels closed FLINK-3824. - Resolution: Fixed Fixed with 7ec1300a740e9b8e1eb595bc15e752a167592e00. > ResourceManager may repeatedly connect to outdated JobManager in HA mode > > > Key: FLINK-3824 > URL: https://issues.apache.org/jira/browse/FLINK-3824 > Project: Flink > Issue Type: Bug > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Minor > Fix For: 1.1.0 > > > When the ResourceManager receives a new leading JobManager via the > LeaderRetrievalService it tries to register with this JobManager until > connected. If during registration a new leader gets elected, the > ResourceManager may still repeatedly try to register with the old one. This > doesn't affect the registration with the new JobManager but leaves error > messages in the log file and may process unnecessary messages. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3669) WindowOperator registers a lot of timers at StreamTask
[ https://issues.apache.org/jira/browse/FLINK-3669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260086#comment-15260086 ] Aljoscha Krettek commented on FLINK-3669: - Thanks! I'll have a look at your branch. For the test-stability, I think there is some problem with the maven caches on Travis. We're waiting for the Infra team to respond. > WindowOperator registers a lot of timers at StreamTask > -- > > Key: FLINK-3669 > URL: https://issues.apache.org/jira/browse/FLINK-3669 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.1 >Reporter: Aljoscha Krettek >Assignee: Konstantin Knauf >Priority: Blocker > > Right now, the WindowOperator registers a timer at the StreamTask for every > processing-time timer that a Trigger registers. We should combine several > registered trigger timers to only register one low-level timer (timer > coalescing). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3835) JSON execution plan not helpful to debug plans with KeySelectors
Fabian Hueske created FLINK-3835: Summary: JSON execution plan not helpful to debug plans with KeySelectors Key: FLINK-3835 URL: https://issues.apache.org/jira/browse/FLINK-3835 Project: Flink Issue Type: Bug Components: Optimizer Affects Versions: 1.0.2, 1.1.0 Reporter: Fabian Hueske Assignee: Fabian Hueske Fix For: 1.1.0, 1.0.3 The JSON execution plans are not helpful when debugging plans that include join operators with key selectors. For joins with hash join strategy, the driver strategy shows: {{"Hybrid Hash (build: Key Extractor)"}} where {{(build: Key Extractor)}} shall help to identify the build side of the join. However, if both inputs use KeySelectors, the build side cannot be identified. I propose to add the operator id to the build side information. The same issue applied for cross driver strategies. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260016#comment-15260016 ] ASF GitHub Bot commented on FLINK-3754: --- Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1916#issuecomment-215061267 @yjshen I looked through the code changes. I was quite impressed that your PR touches nearly every class of the current API. Basically your changes seem to work, however, I'm not sure if we want to implement the validation phase for every scalar function ourselves. Actually, Calcite already comes with type inference, type checking and validation capabilities. I don't know if we want to reinvent the wheel at this point. Your approach inserts a layer under the Table API for doing the validation. However, instead, this layer could also translate the plan into a SQL tree (on top of RelNodes). We could then let Calcite do the work of validation. This could also solve another problem that I faced when working on FLINK-3580. If you take a look at `StandardConvertletTable` of Calcite, you see that Calcite also does some conversions which we also need to implement ourselves if we do not base the Table API on top of SQL. We need to discuss how we want to proceed. Both solution are not perfect. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1916#issuecomment-215061267 @yjshen I looked through the code changes. I was quite impressed that your PR touches nearly every class of the current API. Basically your changes seem to work, however, I'm not sure if we want to implement the validation phase for every scalar function ourselves. Actually, Calcite already comes with type inference, type checking and validation capabilities. I don't know if we want to reinvent the wheel at this point. Your approach inserts a layer under the Table API for doing the validation. However, instead, this layer could also translate the plan into a SQL tree (on top of RelNodes). We could then let Calcite do the work of validation. This could also solve another problem that I faced when working on FLINK-3580. If you take a look at `StandardConvertletTable` of Calcite, you see that Calcite also does some conversions which we also need to implement ourselves if we do not base the Table API on top of SQL. We need to discuss how we want to proceed. Both solution are not perfect. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3601) JobManagerTest times out on StopSignal test
[ https://issues.apache.org/jira/browse/FLINK-3601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260002#comment-15260002 ] Matthias J. Sax commented on FLINK-3601: [~mxm] Are you working on this? If not, I could fix this. > JobManagerTest times out on StopSignal test > --- > > Key: FLINK-3601 > URL: https://issues.apache.org/jira/browse/FLINK-3601 > Project: Flink > Issue Type: Bug > Components: JobManager, Tests >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Minor > Labels: test-stability > > {noformat} > testStopSignal(org.apache.flink.runtime.jobmanager.JobManagerTest) Time > elapsed: 8.018 sec <<< FAILURE! > java.lang.AssertionError: assertion failed: timeout (3 seconds) during > expectMsgClass waiting for class > org.apache.flink.runtime.messages.JobManagerMessages$JobResultSuccess > at scala.Predef$.assert(Predef.scala:165) > at > akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:423) > at akka.testkit.TestKitBase$class.expectMsgClass(TestKit.scala:410) > at akka.testkit.TestKit.expectMsgClass(TestKit.scala:718) > at akka.testkit.JavaTestKit.expectMsgClass(JavaTestKit.java:397) > at > org.apache.flink.runtime.jobmanager.JobManagerTest$2.(JobManagerTest.java:273) > at > org.apache.flink.runtime.jobmanager.JobManagerTest.testStopSignal(JobManagerTest.java:237) > {noformat} > I propose to replace the default timeout per {{expectMsg}} of 3 seconds with > a total test duration of 15 seconds. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3380) Unstable Test: JobSubmissionFailsITCase
[ https://issues.apache.org/jira/browse/FLINK-3380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1526#comment-1526 ] Matthias J. Sax commented on FLINK-3380: [~uce] Can this be closed? PR1611 was merged long ago. > Unstable Test: JobSubmissionFailsITCase > --- > > Key: FLINK-3380 > URL: https://issues.apache.org/jira/browse/FLINK-3380 > Project: Flink > Issue Type: Bug >Reporter: Kostas Kloudas >Priority: Critical > Labels: test-stability > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/108034635/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2971) Add outer joins to the Table API
[ https://issues.apache.org/jira/browse/FLINK-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15259992#comment-15259992 ] Dawid Wysakowicz commented on FLINK-2971: - Thanks for assigning me to this issue. I've started working on it, but I encountered a problem I wanted to discuss. I wanted to enhance {{Table}} with methods like: {code} def join(right: Table): Table def join(right: Table, onColumns: String): Table def join(right: Table, onColumns: Expression): Table def join(right: Table, onColumns: Expression, joinType: JoinType): Table {code} Those method enable users to specify join condition and joinType (join condition pushed from {{filter}} or {{where}} disables the outer joins). Unfortunately right know it is impossible to resolve {{UnresolvedFieldReference}} to a field of one of many inputs. I proposed a solution(table.scala:437-447), but I am not sure if it is elegant enough. My WIP branch: https://github.com/dawidwys/flink/tree/outerJoin > Add outer joins to the Table API > > > Key: FLINK-2971 > URL: https://issues.apache.org/jira/browse/FLINK-2971 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz > > Since Flink now supports outer joins, the Table API can also support left, > right and full outer joins. > Given that null values are properly supported by RowSerializer etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3086) ExpressionParser does not support concatenation of suffix operations
[ https://issues.apache.org/jira/browse/FLINK-3086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-3086. - Resolution: Fixed Fix Version/s: 1.1.0 Fixed in 16059ea. > ExpressionParser does not support concatenation of suffix operations > > > Key: FLINK-3086 > URL: https://issues.apache.org/jira/browse/FLINK-3086 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther > Fix For: 1.1.0 > > > The ExpressionParser of the Table API does not support concatenation of > suffix operations. e.g. > {code}table.select("field.cast(STRING).substring(2)"){code} throws an > exception. > {code} > org.apache.flink.api.table.ExpressionException: Could not parse expression: > string matching regex `\z' expected but `.' found > at > org.apache.flink.api.table.parser.ExpressionParser$.parseExpressionList(ExpressionParser.scala:224) > {code} > However, the Scala implicit Table Expression API supports this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3086) ExpressionParser does not support concatenation of suffix operations
[ https://issues.apache.org/jira/browse/FLINK-3086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15259967#comment-15259967 ] ASF GitHub Bot commented on FLINK-3086: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1912 > ExpressionParser does not support concatenation of suffix operations > > > Key: FLINK-3086 > URL: https://issues.apache.org/jira/browse/FLINK-3086 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther > > The ExpressionParser of the Table API does not support concatenation of > suffix operations. e.g. > {code}table.select("field.cast(STRING).substring(2)"){code} throws an > exception. > {code} > org.apache.flink.api.table.ExpressionException: Could not parse expression: > string matching regex `\z' expected but `.' found > at > org.apache.flink.api.table.parser.ExpressionParser$.parseExpressionList(ExpressionParser.scala:224) > {code} > However, the Scala implicit Table Expression API supports this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3086] [table] ExpressionParser does not...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1912 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3086] [table] ExpressionParser does not...
Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1912#issuecomment-215047867 Ok, I will merge this ;-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3086) ExpressionParser does not support concatenation of suffix operations
[ https://issues.apache.org/jira/browse/FLINK-3086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15259961#comment-15259961 ] ASF GitHub Bot commented on FLINK-3086: --- Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1912#issuecomment-215047867 Ok, I will merge this ;-) > ExpressionParser does not support concatenation of suffix operations > > > Key: FLINK-3086 > URL: https://issues.apache.org/jira/browse/FLINK-3086 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther > > The ExpressionParser of the Table API does not support concatenation of > suffix operations. e.g. > {code}table.select("field.cast(STRING).substring(2)"){code} throws an > exception. > {code} > org.apache.flink.api.table.ExpressionException: Could not parse expression: > string matching regex `\z' expected but `.' found > at > org.apache.flink.api.table.parser.ExpressionParser$.parseExpressionList(ExpressionParser.scala:224) > {code} > However, the Scala implicit Table Expression API supports this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3771) Methods for translating Graphs
[ https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15259927#comment-15259927 ] ASF GitHub Bot commented on FLINK-3771: --- Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-215040606 I think this is a very good idea, and we can wrap the user's MapFunction to function like a MapFunction. Now we can start discussing algorithms :) > Methods for translating Graphs > -- > > Key: FLINK-3771 > URL: https://issues.apache.org/jira/browse/FLINK-3771 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Provide methods for translation of the type or value of graph labels, vertex > values, and edge values. > Sample use cases: > * shifting graph labels in order to union generated graphs or graphs read > from multiple sources > * downsizing labels or values since algorithms prefer to generate wide types > which may be expensive for further computation > * changing label type for testing or benchmarking alternative code paths -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1900#issuecomment-215040606 I think this is a very good idea, and we can wrap the user's MapFunction to function like a MapFunction. Now we can start discussing algorithms :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r61233316 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionITCase.java --- @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class FileSplitMonitoringFunctionITCase extends StreamingProgramTestBase { + + private static final int NO_OF_FILES = 10; + private static final int LINES_PER_FILE = 10; + + private static final long INTERVAL = 200; + + private File baseDir; + + private org.apache.hadoop.fs.FileSystem hdfs; + private String hdfsURI; + private MiniDFSCluster hdfsCluster; + + private Set hdPaths = new HashSet<>(); + private Set hdPathNames = new HashSet<>(); + private Map hdPathContents = new HashMap<>(); + + // PREPARING FOR THE TESTS + + @Before + public void createHDFS() { + try { + baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile(); + FileUtil.fullyDelete(baseDir); + + org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration(); + hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); + hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set. + + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); + hdfsCluster = builder.build(); + + hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/"; + hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf); + + } catch(Throwable e) { + e.printStackTrace(); + Assert.fail("Test failed " + e.getMessage()); + } + } + + @After + public void destroyHDFS() { + FileUtil.fullyDelete(baseDir); + hdfsCluster.shutdown(); + } + + // END OF PREPARATIONS + + private static final Object lock = new Object(); + + private boolean[] finished; + + @Override + protected void testProgram() throws Exception { + FileCreator fileCreator = new FileCreator(INTERVAL); + Thread t = new Thread(fileCreator); + t.start(); + Thread.sleep(100); + + StringFileFormat format = new StringFileFormat(); + format.setFilePath(hdfsURI); + + Configuration config = new Configuration();
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r61233284 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionITCase.java --- @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class FileSplitMonitoringFunctionITCase extends StreamingProgramTestBase { + + private static final int NO_OF_FILES = 10; + private static final int LINES_PER_FILE = 10; + + private static final long INTERVAL = 200; + + private File baseDir; + + private org.apache.hadoop.fs.FileSystem hdfs; + private String hdfsURI; + private MiniDFSCluster hdfsCluster; + + private Set hdPaths = new HashSet<>(); --- End diff -- These fields are not used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r61232860 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionTest.java --- @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +public class FileSplitMonitoringFunctionTest { + + private static final int NO_OF_FILES = 10; + private static final int LINES_PER_FILE = 10; + + private static final long INTERVAL = 200; + + private static File baseDir; + + private static org.apache.hadoop.fs.FileSystem hdfs; + private static String hdfsURI; + private static MiniDFSCluster hdfsCluster; + + // PREPARING FOR THE TESTS + + @BeforeClass + public static void createHDFS() { + try { + baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile(); + FileUtil.fullyDelete(baseDir); + + org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration(); + hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); + hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set. + + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); + hdfsCluster = builder.build(); + + hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/"; + hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf); + + } catch(Throwable e) { + e.printStackTrace(); + Assert.fail("Test failed " + e.getMessage()); + } + } + + @AfterClass + public static void destroyHDFS() { + FileUtil.fullyDelete(baseDir); + hdfsCluster.shutdown(); + } + + // END OF PREPARATIONS + + // TESTS + + @Test + public void testFileReadingOperator() throws Exception { + Set filesCreated = new HashSet<>(); + Map fileContents = new HashMap<>(); + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + fileContents.put(i, file.f1); + } + + StringFileFormat format = new StringFileFormat(); + Configuration config = new Configuration(); + config.setString("input.file.path", hdfsURI); + +
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r61232298 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This is the operator that reads the splits received from {@link FileSplitMonitoringFunction}. + * This operator will receive just the split descriptors and then read and emit records. This may lead + * to backpressure. To avoid this, we will have another thread actually reading the splits and + * another forwarding the checkpoint barriers. The two should sync so that the checkpoints reflect the + * current state. + * */ +public class FileSplitReadOperator extends AbstractStreamOperator implements OneInputStreamOperator { + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + private Queue unReadSplits; + + private transient SplitReader reader; + private transient Object checkpointLock; + private transient TimestampedCollector collector; + + private Configuration configuration; + private FileInputFormat format; + private TypeInformation typeInfo; + private transient TypeSerializer serializer; + + public FileSplitReadOperator(FileInputFormat format, TypeInformation typeInfo, Configuration configuration) { + this.format = format; + this.typeInfo = typeInfo; + this.configuration = configuration; + this.unReadSplits = new ConcurrentLinkedQueue<>(); + + // this is for the extra thread that is reading, + // the tests were not terminating because the success + // exception was caught by the extra thread, and never + // forwarded. + + // for now, and to keep the extra thread to avoid backpressure, + // we just disable chaining for the operator, so that it runs on + // another thread. This will change later for a cleaner design. + setChainingStrategy(ChainingStrategy.NEVER); + } + + @Override + public void open() throws Exception { + super.open(); + + this.format.configure(configuration); + this.collector = new TimestampedCollector(output); + this.checkpointLock = getContainingTask() + .getCheckpointLock(); + this.serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); + + this.reader = new SplitReader(unReadSplits, format, serializer, collector, checkpointLock); + this.reader.start(); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + LOG.info("Reading Split: " + element.getValue()); + this.unReadSplits.a
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r61231656 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This is the operator that reads the splits received from {@link FileSplitMonitoringFunction}. + * This operator will receive just the split descriptors and then read and emit records. This may lead + * to backpressure. To avoid this, we will have another thread actually reading the splits and + * another forwarding the checkpoint barriers. The two should sync so that the checkpoints reflect the + * current state. + * */ +public class FileSplitReadOperator extends AbstractStreamOperator implements OneInputStreamOperator { + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + private Queue unReadSplits; + + private transient SplitReader reader; + private transient Object checkpointLock; + private transient TimestampedCollector collector; + + private Configuration configuration; + private FileInputFormat format; + private TypeInformation typeInfo; + private transient TypeSerializer serializer; + + public FileSplitReadOperator(FileInputFormat format, TypeInformation typeInfo, Configuration configuration) { + this.format = format; + this.typeInfo = typeInfo; + this.configuration = configuration; + this.unReadSplits = new ConcurrentLinkedQueue<>(); + + // this is for the extra thread that is reading, + // the tests were not terminating because the success + // exception was caught by the extra thread, and never + // forwarded. + + // for now, and to keep the extra thread to avoid backpressure, + // we just disable chaining for the operator, so that it runs on + // another thread. This will change later for a cleaner design. + setChainingStrategy(ChainingStrategy.NEVER); + } + + @Override + public void open() throws Exception { + super.open(); + + this.format.configure(configuration); + this.collector = new TimestampedCollector(output); + this.checkpointLock = getContainingTask() + .getCheckpointLock(); + this.serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); + + this.reader = new SplitReader(unReadSplits, format, serializer, collector, checkpointLock); + this.reader.start(); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + LOG.info("Reading Split: " + element.getValue()); + this.unReadSplits.a
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r61231507 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This is the operator that reads the splits received from {@link FileSplitMonitoringFunction}. + * This operator will receive just the split descriptors and then read and emit records. This may lead + * to backpressure. To avoid this, we will have another thread actually reading the splits and + * another forwarding the checkpoint barriers. The two should sync so that the checkpoints reflect the + * current state. + * */ +public class FileSplitReadOperator extends AbstractStreamOperator implements OneInputStreamOperator { + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + private Queue unReadSplits; + + private transient SplitReader reader; + private transient Object checkpointLock; + private transient TimestampedCollector collector; + + private Configuration configuration; + private FileInputFormat format; + private TypeInformation typeInfo; + private transient TypeSerializer serializer; + + public FileSplitReadOperator(FileInputFormat format, TypeInformation typeInfo, Configuration configuration) { + this.format = format; + this.typeInfo = typeInfo; + this.configuration = configuration; + this.unReadSplits = new ConcurrentLinkedQueue<>(); + + // this is for the extra thread that is reading, + // the tests were not terminating because the success + // exception was caught by the extra thread, and never + // forwarded. + + // for now, and to keep the extra thread to avoid backpressure, + // we just disable chaining for the operator, so that it runs on + // another thread. This will change later for a cleaner design. + setChainingStrategy(ChainingStrategy.NEVER); + } + + @Override + public void open() throws Exception { + super.open(); + + this.format.configure(configuration); + this.collector = new TimestampedCollector(output); + this.checkpointLock = getContainingTask() + .getCheckpointLock(); + this.serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); + + this.reader = new SplitReader(unReadSplits, format, serializer, collector, checkpointLock); + this.reader.start(); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + LOG.info("Reading Split: " + element.getValue()); + this.unReadSplits.a
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r61231326 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This is the operator that reads the splits received from {@link FileSplitMonitoringFunction}. + * This operator will receive just the split descriptors and then read and emit records. This may lead + * to backpressure. To avoid this, we will have another thread actually reading the splits and + * another forwarding the checkpoint barriers. The two should sync so that the checkpoints reflect the + * current state. + * */ +public class FileSplitReadOperator extends AbstractStreamOperator implements OneInputStreamOperator { + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + private Queue unReadSplits; --- End diff -- I think the queue can be moved to the `SplitReader`, the split reader can then have a method `addSplit`. This would better isolate concerns. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r61230608 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This is the operator that reads the splits received from {@link FileSplitMonitoringFunction}. + * This operator will receive just the split descriptors and then read and emit records. This may lead + * to backpressure. To avoid this, we will have another thread actually reading the splits and + * another forwarding the checkpoint barriers. The two should sync so that the checkpoints reflect the + * current state. + * */ +public class FileSplitReadOperator extends AbstractStreamOperator implements OneInputStreamOperator { + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + private Queue unReadSplits; + + private transient SplitReader reader; + private transient Object checkpointLock; + private transient TimestampedCollector collector; + + private Configuration configuration; + private FileInputFormat format; + private TypeInformation typeInfo; + private transient TypeSerializer serializer; + + public FileSplitReadOperator(FileInputFormat format, TypeInformation typeInfo, Configuration configuration) { + this.format = format; + this.typeInfo = typeInfo; + this.configuration = configuration; + this.unReadSplits = new ConcurrentLinkedQueue<>(); + + // this is for the extra thread that is reading, + // the tests were not terminating because the success + // exception was caught by the extra thread, and never + // forwarded. + + // for now, and to keep the extra thread to avoid backpressure, + // we just disable chaining for the operator, so that it runs on + // another thread. This will change later for a cleaner design. + setChainingStrategy(ChainingStrategy.NEVER); + } + + @Override + public void open() throws Exception { + super.open(); + + this.format.configure(configuration); + this.collector = new TimestampedCollector(output); + this.checkpointLock = getContainingTask() + .getCheckpointLock(); + this.serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); + + this.reader = new SplitReader(unReadSplits, format, serializer, collector, checkpointLock); + this.reader.start(); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + LOG.info("Reading Split: " + element.getValue()); --- End diff -- Should
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r61230524 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This is the operator that reads the splits received from {@link FileSplitMonitoringFunction}. + * This operator will receive just the split descriptors and then read and emit records. This may lead + * to backpressure. To avoid this, we will have another thread actually reading the splits and + * another forwarding the checkpoint barriers. The two should sync so that the checkpoints reflect the + * current state. + * */ +public class FileSplitReadOperator extends AbstractStreamOperator implements OneInputStreamOperator { + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + private Queue unReadSplits; + + private transient SplitReader reader; + private transient Object checkpointLock; + private transient TimestampedCollector collector; + + private Configuration configuration; + private FileInputFormat format; + private TypeInformation typeInfo; + private transient TypeSerializer serializer; + + public FileSplitReadOperator(FileInputFormat format, TypeInformation typeInfo, Configuration configuration) { + this.format = format; + this.typeInfo = typeInfo; + this.configuration = configuration; + this.unReadSplits = new ConcurrentLinkedQueue<>(); + + // this is for the extra thread that is reading, + // the tests were not terminating because the success + // exception was caught by the extra thread, and never + // forwarded. + + // for now, and to keep the extra thread to avoid backpressure, + // we just disable chaining for the operator, so that it runs on + // another thread. This will change later for a cleaner design. + setChainingStrategy(ChainingStrategy.NEVER); + } + + @Override + public void open() throws Exception { + super.open(); + + this.format.configure(configuration); + this.collector = new TimestampedCollector(output); --- End diff -- Superfluous `OUT` parameter, can be `<>` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r61230544 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitReadOperator.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This is the operator that reads the splits received from {@link FileSplitMonitoringFunction}. + * This operator will receive just the split descriptors and then read and emit records. This may lead + * to backpressure. To avoid this, we will have another thread actually reading the splits and + * another forwarding the checkpoint barriers. The two should sync so that the checkpoints reflect the + * current state. + * */ +public class FileSplitReadOperator extends AbstractStreamOperator implements OneInputStreamOperator { + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + private Queue unReadSplits; + + private transient SplitReader reader; + private transient Object checkpointLock; + private transient TimestampedCollector collector; + + private Configuration configuration; + private FileInputFormat format; + private TypeInformation typeInfo; + private transient TypeSerializer serializer; + + public FileSplitReadOperator(FileInputFormat format, TypeInformation typeInfo, Configuration configuration) { + this.format = format; + this.typeInfo = typeInfo; + this.configuration = configuration; + this.unReadSplits = new ConcurrentLinkedQueue<>(); + + // this is for the extra thread that is reading, + // the tests were not terminating because the success + // exception was caught by the extra thread, and never + // forwarded. + + // for now, and to keep the extra thread to avoid backpressure, + // we just disable chaining for the operator, so that it runs on + // another thread. This will change later for a cleaner design. + setChainingStrategy(ChainingStrategy.NEVER); + } + + @Override + public void open() throws Exception { + super.open(); + + this.format.configure(configuration); + this.collector = new TimestampedCollector(output); + this.checkpointLock = getContainingTask() + .getCheckpointLock(); + this.serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); + + this.reader = new SplitReader(unReadSplits, format, serializer, collector, checkpointLock); --- End diff -- Missing generic, can be `<>` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the f
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r61230370 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits + * to downstream tasks for further reading and processing. Which splits will be further processed + * depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + */ +public class FileSplitMonitoringFunction + extends RichSourceFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + /** +* Specifies when computation will be triggered. +*/ + public enum WatchType { + REPROCESS_WITH_APPENDED // Reprocesses the whole file when new data is appended. + } + + /** The path to monitor. */ + private final String path; + + /** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */ + private final int readerParallelism; + + /** The {@link FileInputFormat} to be read. */ + private FileInputFormat format; + + /** How often to monitor the state of the directory for new data. */ + private final long interval; + + /** Which new data to process (see {@link WatchType}. */ + private final WatchType watchType; + + private long globalModificationTime; + + private FilePathFilter pathFilter; + + private volatile boolean isRunning = false; + + private Configuration configuration; --- End diff -- We should explain why we have the `Configuration` here. That the `Configuration` that we get in `open()` is not valid. Same in the `FileSplitReadOperator`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3834) flink-statebackend-rocksdb
Till Rohrmann created FLINK-3834: Summary: flink-statebackend-rocksdb Key: FLINK-3834 URL: https://issues.apache.org/jira/browse/FLINK-3834 Project: Flink Issue Type: Sub-task Reporter: Till Rohrmann -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3833) flink-test-utils
Till Rohrmann created FLINK-3833: Summary: flink-test-utils Key: FLINK-3833 URL: https://issues.apache.org/jira/browse/FLINK-3833 Project: Flink Issue Type: Sub-task Reporter: Till Rohrmann -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3832) flink-streaming-scala
Till Rohrmann created FLINK-3832: Summary: flink-streaming-scala Key: FLINK-3832 URL: https://issues.apache.org/jira/browse/FLINK-3832 Project: Flink Issue Type: Sub-task Reporter: Till Rohrmann -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3831) flink-streaming-java
Till Rohrmann created FLINK-3831: Summary: flink-streaming-java Key: FLINK-3831 URL: https://issues.apache.org/jira/browse/FLINK-3831 Project: Flink Issue Type: Sub-task Reporter: Till Rohrmann -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3830) flink-scala
Till Rohrmann created FLINK-3830: Summary: flink-scala Key: FLINK-3830 URL: https://issues.apache.org/jira/browse/FLINK-3830 Project: Flink Issue Type: Sub-task Reporter: Till Rohrmann -- This message was sent by Atlassian JIRA (v6.3.4#6332)