[GitHub] flink issue #3081: [FLINK-5426] Clean up the Flink Machine Learning library
Github user Fokko commented on the issue: https://github.com/apache/flink/pull/3081 I've rebased both branches with master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5426) Clean up the Flink Machine Learning library
[ https://issues.apache.org/jira/browse/FLINK-5426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823579#comment-15823579 ] ASF GitHub Bot commented on FLINK-5426: --- Github user Fokko commented on the issue: https://github.com/apache/flink/pull/3081 I've rebased both branches with master. > Clean up the Flink Machine Learning library > --- > > Key: FLINK-5426 > URL: https://issues.apache.org/jira/browse/FLINK-5426 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library >Reporter: Fokko Driesprong >Assignee: Fokko Driesprong > > Hi Guys, > I would like to clean up the Machine Learning library. A lot of the code in > the ML Library does not conform to the original contribution guide. For > example: > Duplicate tests, different names, but exactly the same testcase: > https://github.com/apache/flink/blob/master/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala#L148 > https://github.com/apache/flink/blob/master/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala#L164 > Lot of multi-line tests-cases: > https://github.com/Fokko/flink/blob/master/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala > Mis-use of constants: > https://github.com/apache/flink/blob/master/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseMatrix.scala#L58 > Please allow me to clean this up, and I'm looking forward to contribute more > code, especially to the ML part. I've have been a contributor to Apache Spark > and am happy to extend the codebase with new distributed algorithms and make > the codebase more mature. > Cheers, Fokko -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5498) Fix JoinITCase and add support for filter expressions on the On clause in left/right outer joins
[ https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln.lee updated FLINK-5498: --- Description: I found the expected result of a unit test case incorrect compare to that in a RDMBS, see flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala {code:title=JoinITCase.scala} def testRightJoinWithNotOnlyEquiJoin(): Unit = { ... val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) val expected = "Hello world,BCD\n" val results = joinT.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } {code} Then I took some time to learn about the ‘outer join’ in relational databases, the right result of above case should be(tested in SQL Server and MySQL, the results are same): {code} > select c, g from tuple3 right outer join tuple5 on a=f and bselect c, g from tuple3 right outer join tuple5 on a=f and b
[jira] [Commented] (FLINK-5426) Clean up the Flink Machine Learning library
[ https://issues.apache.org/jira/browse/FLINK-5426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823577#comment-15823577 ] ASF GitHub Bot commented on FLINK-5426: --- Github user Fokko commented on the issue: https://github.com/apache/flink/pull/3081 Hi @tillrohrmann, did you find any time to check #3077 and this PR? > Clean up the Flink Machine Learning library > --- > > Key: FLINK-5426 > URL: https://issues.apache.org/jira/browse/FLINK-5426 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library >Reporter: Fokko Driesprong >Assignee: Fokko Driesprong > > Hi Guys, > I would like to clean up the Machine Learning library. A lot of the code in > the ML Library does not conform to the original contribution guide. For > example: > Duplicate tests, different names, but exactly the same testcase: > https://github.com/apache/flink/blob/master/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala#L148 > https://github.com/apache/flink/blob/master/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala#L164 > Lot of multi-line tests-cases: > https://github.com/Fokko/flink/blob/master/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala > Mis-use of constants: > https://github.com/apache/flink/blob/master/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseMatrix.scala#L58 > Please allow me to clean this up, and I'm looking forward to contribute more > code, especially to the ML part. I've have been a contributor to Apache Spark > and am happy to extend the codebase with new distributed algorithms and make > the codebase more mature. > Cheers, Fokko -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3081: [FLINK-5426] Clean up the Flink Machine Learning library
Github user Fokko commented on the issue: https://github.com/apache/flink/pull/3081 Hi @tillrohrmann, did you find any time to check #3077 and this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-5499) Try to reuse the resource location of prior execution attempt in allocating slot
[ https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang reassigned FLINK-5499: Assignee: Zhijiang Wang > Try to reuse the resource location of prior execution attempt in allocating > slot > > > Key: FLINK-5499 > URL: https://issues.apache.org/jira/browse/FLINK-5499 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > Currently when schedule execution to request to allocate slot from > {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. So > for task fail over scenario, the new execution attempt may be deployed to > different task managers. If setting rockDB as state backend, the performance > is better if the data can be restored from local machine. So we try to reuse > the {{TaskManagerLocation}} of prior execution attempt when allocating slot > from {{SlotPool}}. If the {{TaskManagerLocation}} is empty from prior > executions, the behavior is the same with current status. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5499) Try to reuse the resource location of prior execution attempt in allocating slot
[ https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5499: - Description: Currently when schedule execution to request to allocate slot from {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. So for task fail over scenario, the new execution attempt may be deployed to different task managers. If setting rockDB as state backend, the performance is better if the data can be restored from local machine. So we try to reuse the {{TaskManagerLocation}} of prior execution attempt when allocating slot from {{SlotPool}}. If the {{TaskManagerLocation}} is empty from prior executions, the behavior is the same with current status. (was: Currently when schedule execution to request to allocate slot from {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. So for task fail over scenario, the new execution attempt may be deployed to different task managers. If setting rockDB as state backend, the performance is better if the data can be restored from local machine. So we try to reuse the TaskManagerLocation of prior execution attempt when allocating slot from SlotPool. If the TaskManagerLocation is empty from prior executions, the behavior is the same with current status.) > Try to reuse the resource location of prior execution attempt in allocating > slot > > > Key: FLINK-5499 > URL: https://issues.apache.org/jira/browse/FLINK-5499 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: Zhijiang Wang > > Currently when schedule execution to request to allocate slot from > {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. So > for task fail over scenario, the new execution attempt may be deployed to > different task managers. If setting rockDB as state backend, the performance > is better if the data can be restored from local machine. So we try to reuse > the {{TaskManagerLocation}} of prior execution attempt when allocating slot > from {{SlotPool}}. If the {{TaskManagerLocation}} is empty from prior > executions, the behavior is the same with current status. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5499) Try to reuse the resource location of prior execution attempt in allocating slot
[ https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5499: - Description: Currently when schedule execution to request to allocate slot from {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. So for task fail over scenario, the new execution attempt may be deployed to different task managers. If setting rockDB as state backend, the performance is better if the data can be restored from local machine. So we try to reuse the TaskManagerLocation of prior execution attempt when allocating slot from SlotPool. If the TaskManagerLocation is empty from prior executions, the behavior is the same with current status. (was: Currently when schedule execution to request to allocate slot from SlotPool, the TaskManagerLocation parameter is empty collection. So for task fail over scenario, the new execution attempt may be deployed to different task managers. If setting rockDB as state backend, the performance is better if the data can be restored from local machine. So we try to reuse the TaskManagerLocation of prior execution attempt when allocating slot from SlotPool. If the TaskManagerLocation is empty from prior executions, the behavior is the same with current status.) > Try to reuse the resource location of prior execution attempt in allocating > slot > > > Key: FLINK-5499 > URL: https://issues.apache.org/jira/browse/FLINK-5499 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: Zhijiang Wang > > Currently when schedule execution to request to allocate slot from > {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. So > for task fail over scenario, the new execution attempt may be deployed to > different task managers. If setting rockDB as state backend, the performance > is better if the data can be restored from local machine. So we try to reuse > the TaskManagerLocation of prior execution attempt when allocating slot from > SlotPool. If the TaskManagerLocation is empty from prior executions, the > behavior is the same with current status. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5498) Fix JoinITCase and add support for filter expressions on the On clause in left/right outer joins
[ https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln.lee updated FLINK-5498: --- Description: I found the expected result of a unit test case incorrect compare to that in a RDMBS, see flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala {code:title=JoinITCase.scala} def testRightJoinWithNotOnlyEquiJoin(): Unit = { ... val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) val expected = "Hello world,BCD\n" val results = joinT.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } {code} Then I took some time to learn about the ‘outer join’ in relational databases, the right result of above case should be(tested in SQL Server and MySQL, the results are same): {code} > select c, g from tuple3 right outer join tuple5 on a=f and bselect c, g from tuple3 right outer join tuple5 on a=f and b
[jira] [Updated] (FLINK-5499) Try to reuse the resource location of prior execution attempt in allocating slot
[ https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5499: - Summary: Try to reuse the resource location of prior execution attempt in allocating slot (was: Try to reuse the resource location of prior execution attempt when allocate slot) > Try to reuse the resource location of prior execution attempt in allocating > slot > > > Key: FLINK-5499 > URL: https://issues.apache.org/jira/browse/FLINK-5499 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: Zhijiang Wang > > Currently when schedule execution to request to allocate slot from SlotPool, > the TaskManagerLocation parameter is empty collection. So for task fail over > scenario, the new execution attempt may be deployed to different task > managers. If setting rockDB as state backend, the performance is better if > the data can be restored from local machine. So we try to reuse the > TaskManagerLocation of prior execution attempt when allocating slot from > SlotPool. If the TaskManagerLocation is empty from prior executions, the > behavior is the same with current status. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5499) Try to reuse the resource location of prior execution attempt when allocate slot
[ https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhijiang Wang updated FLINK-5499: - Summary: Try to reuse the resource location of prior execution attempt when allocate slot (was: Try to reuse the resource location of previous execution attempt when allocate slot) > Try to reuse the resource location of prior execution attempt when allocate > slot > > > Key: FLINK-5499 > URL: https://issues.apache.org/jira/browse/FLINK-5499 > Project: Flink > Issue Type: Improvement > Components: JobManager >Reporter: Zhijiang Wang > > Currently when schedule execution to request to allocate slot from SlotPool, > the TaskManagerLocation parameter is empty collection. So for task fail over > scenario, the new execution attempt may be deployed to different task > managers. If setting rockDB as state backend, the performance is better if > the data can be restored from local machine. So we try to reuse the > TaskManagerLocation of prior execution attempt when allocating slot from > SlotPool. If the TaskManagerLocation is empty from prior executions, the > behavior is the same with current status. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5498) Fix JoinITCase and add support for filter expressions on the On clause in left/right outer joins
[ https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln.lee updated FLINK-5498: --- Description: I found the expected result of a unit test case incorrect compare to that in a RDMBS, see flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala {code:title=JoinITCase.scala} def testRightJoinWithNotOnlyEquiJoin(): Unit = { ... val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) val expected = "Hello world,BCD\n" val results = joinT.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } {code} Then I took some time to learn about the ‘outer join’ in relational databases, the right result of above case should be(tested in SQL Server and MySQL, the results are same): {code} > select c, g from tuple3 right outer join tuple5 on a=f and bselect c, g from tuple3 right outer join tuple5 on a=f and b
[jira] [Created] (FLINK-5499) Try to reuse the resource location of previous execution attempt when allocate slot
Zhijiang Wang created FLINK-5499: Summary: Try to reuse the resource location of previous execution attempt when allocate slot Key: FLINK-5499 URL: https://issues.apache.org/jira/browse/FLINK-5499 Project: Flink Issue Type: Improvement Components: JobManager Reporter: Zhijiang Wang Currently when schedule execution to request to allocate slot from SlotPool, the TaskManagerLocation parameter is empty collection. So for task fail over scenario, the new execution attempt may be deployed to different task managers. If setting rockDB as state backend, the performance is better if the data can be restored from local machine. So we try to reuse the TaskManagerLocation of prior execution attempt when allocating slot from SlotPool. If the TaskManagerLocation is empty from prior executions, the behavior is the same with current status. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5498) Fix JoinITCase and add support for filter expressions on the On clause in left/right outer joins
[ https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln.lee updated FLINK-5498: --- Description: I found the expected result of a unit test case incorrect compare to that in a RDMBS, see flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala {code:title=JoinITCase.scala} def testRightJoinWithNotOnlyEquiJoin(): Unit = { ... val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) val expected = "Hello world,BCD\n" val results = joinT.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } {code} Then I took some time to learn about the ‘outer join’ in relational databases, the right result of above case should be(tested in SQL Server and MySQL, the results are same): {panel} > select c, g from tuple3 right outer join tuple5 on a=f and bselect c, g from tuple3 right outer join tuple5 on a=f and b
[jira] [Created] (FLINK-5498) Fix JoinITCase and add support for filter expressions on the On clause in left/right outer joins
lincoln.lee created FLINK-5498: -- Summary: Fix JoinITCase and add support for filter expressions on the On clause in left/right outer joins Key: FLINK-5498 URL: https://issues.apache.org/jira/browse/FLINK-5498 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.1.4 Reporter: lincoln.lee Assignee: lincoln.lee -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3475) DISTINCT aggregate function support for SQL queries
[ https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823450#comment-15823450 ] ASF GitHub Bot commented on FLINK-3475: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r96160082 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala --- @@ -165,9 +165,13 @@ class AggregationTest extends TableTestBase { val util = batchTestUtil() val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c) -val resultTable = sourceTable.groupBy('a) +// Move "where" before "groupBy" for the former query would generate +// nondeterministic plans with same cost. If we change FlinkRuleSets.DATASET_OPT_RULES, +// the importance of relNode may change, and the test may fail. This issue is mentioned +// in FLINK-5394, we could move "where" to the end when FLINK-5394 is fixed. +val resultTable = sourceTable.where('a === 1).groupBy('a) --- End diff -- It might make sense to wait with this until #3058 is in. It is almost done I think. > DISTINCT aggregate function support for SQL queries > --- > > Key: FLINK-3475 > URL: https://issues.apache.org/jira/browse/FLINK-3475 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Chengxiang Li >Assignee: Zhenghua Gao > > DISTINCT aggregate function may be able to reuse the aggregate function > instead of separate implementation, and let Flink runtime take care of > duplicate records. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3475) DISTINCT aggregate function support for SQL queries
[ https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823451#comment-15823451 ] ASF GitHub Bot commented on FLINK-3475: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r96160329 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java --- @@ -0,0 +1,1144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.dataSet; + +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.fun.SqlCountAggFunction; +import org.apache.calcite.sql.fun.SqlMinMaxAggFunction; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.fun.SqlSumAggFunction; +import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * Planner rule that expands distinct aggregates + * (such as {@code COUNT(DISTINCT x)}) from a + * {@link org.apache.calcite.rel.logical.LogicalAggregate}. + * + * How this is done depends upon the arguments to the function. If all + * functions have the same argument + * (e.g. {@code COUNT(DISTINCT x), SUM(DISTINCT x)} both have the argument + * {@code x}) then one extra {@link org.apache.calcite.rel.core.Aggregate} is + * sufficient. + * + * If there are multiple arguments + * (e.g. {@code COUNT(DISTINCT x), COUNT(DISTINCT y)}) + * the rule creates separate {@code Aggregate}s and combines using a + * {@link org.apache.calcite.rel.core.Join}. + */ +public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule { --- End diff -- I would like to move this class to `org.apache.flink.table.calcite` package, and add a comment to the top of the class to annotate this is a temporary solution and should be removed later, such as >This is a copy of Calcite's [[AggregateExpandDistinctAggregatesRule]] with a quick fix to avoid some bad case mentioned in CALCITE-1558. Should drop it and use calcite's [[AggregateExpandDistinctAggregatesRule]] when upgrade to calcite 1.12 (above). > DISTINCT aggregate function support for SQL queries > --- > > Key: FLINK-3475 >
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r96160329 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java --- @@ -0,0 +1,1144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.dataSet; + +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.fun.SqlCountAggFunction; +import org.apache.calcite.sql.fun.SqlMinMaxAggFunction; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.fun.SqlSumAggFunction; +import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * Planner rule that expands distinct aggregates + * (such as {@code COUNT(DISTINCT x)}) from a + * {@link org.apache.calcite.rel.logical.LogicalAggregate}. + * + * How this is done depends upon the arguments to the function. If all + * functions have the same argument + * (e.g. {@code COUNT(DISTINCT x), SUM(DISTINCT x)} both have the argument + * {@code x}) then one extra {@link org.apache.calcite.rel.core.Aggregate} is + * sufficient. + * + * If there are multiple arguments + * (e.g. {@code COUNT(DISTINCT x), COUNT(DISTINCT y)}) + * the rule creates separate {@code Aggregate}s and combines using a + * {@link org.apache.calcite.rel.core.Join}. + */ +public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule { --- End diff -- I would like to move this class to `org.apache.flink.table.calcite` package, and add a comment to the top of the class to annotate this is a temporary solution and should be removed later, such as >This is a copy of Calcite's [[AggregateExpandDistinctAggregatesRule]] with a quick fix to avoid some bad case mentioned in CALCITE-1558. Should drop it and use calcite's [[AggregateExpandDistinctAggregatesRule]] when upgrade to calcite 1.12 (above). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3111#discussion_r96160082 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala --- @@ -165,9 +165,13 @@ class AggregationTest extends TableTestBase { val util = batchTestUtil() val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c) -val resultTable = sourceTable.groupBy('a) +// Move "where" before "groupBy" for the former query would generate +// nondeterministic plans with same cost. If we change FlinkRuleSets.DATASET_OPT_RULES, +// the importance of relNode may change, and the test may fail. This issue is mentioned +// in FLINK-5394, we could move "where" to the end when FLINK-5394 is fixed. +val resultTable = sourceTable.where('a === 1).groupBy('a) --- End diff -- It might make sense to wait with this until #3058 is in. It is almost done I think. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4930) Implement FLIP-6 YARN client
[ https://issues.apache.org/jira/browse/FLINK-4930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823397#comment-15823397 ] ASF GitHub Bot commented on FLINK-4930: --- Github user shuai-xu closed the pull request at: https://github.com/apache/flink/pull/2935 > Implement FLIP-6 YARN client > > > Key: FLINK-4930 > URL: https://issues.apache.org/jira/browse/FLINK-4930 > Project: Flink > Issue Type: Sub-task > Components: YARN > Environment: {{flip-6}} feature branch >Reporter: Stephan Ewen >Assignee: shuai.xu > > The FLIP-6 YARN client can follow parts of the existing YARN client. > The main difference is that it does not wait for the cluster to be fully > started and for all TaskManagers to register. It simply submits > - Set up all configurations and environment variables > - Set up the resources: Flink jar, utility jars (logging), user jar > - Set up attached tokens / certificates > - Submit the Yarn application > - Listen for leader / attempt to connect to the JobManager to subscribe to > updates > - Integration with the Flink CLI (command line interface) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2935: [FLINK-4930] [client, yarn] Implement FLIP-6 YARN ...
Github user shuai-xu closed the pull request at: https://github.com/apache/flink/pull/2935 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5414) Bump up Calcite version to 1.11
[ https://issues.apache.org/jira/browse/FLINK-5414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823395#comment-15823395 ] Jark Wu commented on FLINK-5414: Calcite 1.11 has been released. I will start to work on this issue in these days. > Bump up Calcite version to 1.11 > --- > > Key: FLINK-5414 > URL: https://issues.apache.org/jira/browse/FLINK-5414 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > > The upcoming Calcite release 1.11 has a lot of stability fixes and new > features. We should update it for the Table API. > E.g. we can hopefully merge FLINK-4864 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5118) Inconsistent records sent/received metrics
[ https://issues.apache.org/jira/browse/FLINK-5118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823272#comment-15823272 ] ASF GitHub Bot commented on FLINK-5118: --- Github user xhumanoid commented on the issue: https://github.com/apache/flink/pull/3106 @zentol I asked because you always check on null when you try writing to Counter or is it prevent uninitialized state? > Inconsistent records sent/received metrics > -- > > Key: FLINK-5118 > URL: https://issues.apache.org/jira/browse/FLINK-5118 > Project: Flink > Issue Type: Bug > Components: Metrics, Webfrontend >Affects Versions: 1.2.0, 1.3.0 >Reporter: Ufuk Celebi >Assignee: Chesnay Schepler > Fix For: 1.2.0, 1.3.0 > > > In 1.2-SNAPSHOT running a large scale job you see that the counts for > send/received records are inconsistent, e.g. in a simple word count job we > see more received records/bytes than we see sent. This is a regression from > 1.1 where everything works as expected. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3106: [FLINK-5118] Fix inconsistent numBytesIn/Out metrics
Github user xhumanoid commented on the issue: https://github.com/apache/flink/pull/3106 @zentol I asked because you always check on null when you try writing to Counter or is it prevent uninitialized state? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5118) Inconsistent records sent/received metrics
[ https://issues.apache.org/jira/browse/FLINK-5118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823265#comment-15823265 ] ASF GitHub Bot commented on FLINK-5118: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3106 @xhumanoid The metrics returned by the TaskIOMetricGroup can't actually be null, so I wouldn't put too much thought into dealing with that case. > Inconsistent records sent/received metrics > -- > > Key: FLINK-5118 > URL: https://issues.apache.org/jira/browse/FLINK-5118 > Project: Flink > Issue Type: Bug > Components: Metrics, Webfrontend >Affects Versions: 1.2.0, 1.3.0 >Reporter: Ufuk Celebi >Assignee: Chesnay Schepler > Fix For: 1.2.0, 1.3.0 > > > In 1.2-SNAPSHOT running a large scale job you see that the counts for > send/received records are inconsistent, e.g. in a simple word count job we > see more received records/bytes than we see sent. This is a regression from > 1.1 where everything works as expected. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3106: [FLINK-5118] Fix inconsistent numBytesIn/Out metrics
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3106 @xhumanoid The metrics returned by the TaskIOMetricGroup can't actually be null, so I wouldn't put too much thought into dealing with that case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5497) remove duplicated tests
Alexey Diomin created FLINK-5497: Summary: remove duplicated tests Key: FLINK-5497 URL: https://issues.apache.org/jira/browse/FLINK-5497 Project: Flink Issue Type: Improvement Components: Tests Reporter: Alexey Diomin Priority: Minor Now we have test which run the same code 4 times, every run 17+ seconds. Need do small refactoring and remove duplicated code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5118) Inconsistent records sent/received metrics
[ https://issues.apache.org/jira/browse/FLINK-5118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823250#comment-15823250 ] ASF GitHub Bot commented on FLINK-5118: --- Github user xhumanoid commented on the issue: https://github.com/apache/flink/pull/3106 @zentol what do you think about remove if (numBytesOut != null) { and replace numBytesOut = metrics.getNumBytesOutCounter(); with + if (metrics.getNumBytesOutCounter() != null) { +numBytesOut = metrics.getNumBytesOutCounter(); + } else { +numBytesOut = new NullCounter(); + } where NullCounter have empty implementation for every method, prof: we do null check in one place, because sometime we may forget to do it cons: sometimes we broke devirtualization and inlining for counter.inc(..) method > Inconsistent records sent/received metrics > -- > > Key: FLINK-5118 > URL: https://issues.apache.org/jira/browse/FLINK-5118 > Project: Flink > Issue Type: Bug > Components: Metrics, Webfrontend >Affects Versions: 1.2.0, 1.3.0 >Reporter: Ufuk Celebi >Assignee: Chesnay Schepler > Fix For: 1.2.0, 1.3.0 > > > In 1.2-SNAPSHOT running a large scale job you see that the counts for > send/received records are inconsistent, e.g. in a simple word count job we > see more received records/bytes than we see sent. This is a regression from > 1.1 where everything works as expected. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3106: [FLINK-5118] Fix inconsistent numBytesIn/Out metrics
Github user xhumanoid commented on the issue: https://github.com/apache/flink/pull/3106 @zentol what do you think about remove if (numBytesOut != null) { and replace numBytesOut = metrics.getNumBytesOutCounter(); with + if (metrics.getNumBytesOutCounter() != null) { +numBytesOut = metrics.getNumBytesOutCounter(); + } else { +numBytesOut = new NullCounter(); + } where NullCounter have empty implementation for every method, prof: we do null check in one place, because sometime we may forget to do it cons: sometimes we broke devirtualization and inlining for counter.inc(..) method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5355) Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
[ https://issues.apache.org/jira/browse/FLINK-5355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823235#comment-15823235 ] ASF GitHub Bot commented on FLINK-5355: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3078#discussion_r96145735 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp return getShardIteratorResult.getShardIterator(); } + /** +* Determines whether the exception can be recovered from using +* exponential-backoff +* +* @param ex +*Exception to inspect --- End diff -- nit: I think the Javadoc formatting here is inconsistent with the other methods (line change). > Handle AmazonKinesisException gracefully in Kinesis Streaming Connector > --- > > Key: FLINK-5355 > URL: https://issues.apache.org/jira/browse/FLINK-5355 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.2.0, 1.1.3 >Reporter: Scott Kidder >Assignee: Scott Kidder > > My Flink job that consumes from a Kinesis stream must be restarted at least > once daily due to an uncaught AmazonKinesisException when reading from > Kinesis. The complete stacktrace looks like: > {noformat} > com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: > AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: > dc1b7a1a-1b97-1a32-8cd5-79a896a55223) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583) > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193) > at > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268) > at > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > It's interesting that the Kinesis endpoint returned a 500 status code, but > that's outside the scope of this issue. > I think we can handle this exception in the same manner as a > ProvisionedThroughputException: performing an exponential backoff and > retrying a finite number of times before throwing an exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5355) Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
[ https://issues.apache.org/jira/browse/FLINK-5355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823236#comment-15823236 ] ASF GitHub Bot commented on FLINK-5355: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3078#discussion_r96145793 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import static org.junit.Assert.*; --- End diff -- In Flink we generally try to avoid asterisk imports. The style check doesn't actually check the test scope, but it'll be good to try to be consistent with the style rules in tests also. > Handle AmazonKinesisException gracefully in Kinesis Streaming Connector > --- > > Key: FLINK-5355 > URL: https://issues.apache.org/jira/browse/FLINK-5355 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.2.0, 1.1.3 >Reporter: Scott Kidder >Assignee: Scott Kidder > > My Flink job that consumes from a Kinesis stream must be restarted at least > once daily due to an uncaught AmazonKinesisException when reading from > Kinesis. The complete stacktrace looks like: > {noformat} > com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: > AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: > dc1b7a1a-1b97-1a32-8cd5-79a896a55223) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583) > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193) > at > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268) > at > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > It's interesting that the Kinesis endpoint returned a 500 status code, but > that's outside the scope of this issue. > I think we can handle this exception in the same manner as a > ProvisionedThroughputException: performing an exponential backoff and > retrying a finite number of times before throwing an exception. -- This message was
[jira] [Commented] (FLINK-5355) Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
[ https://issues.apache.org/jira/browse/FLINK-5355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823233#comment-15823233 ] ASF GitHub Bot commented on FLINK-5355: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3078#discussion_r96145784 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.AmazonServiceException.ErrorType; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; + +/** + * Test for methods in the KinesisProxy class. --- End diff -- Should link the `KinesisProxy` referencing, like other Javadocs. > Handle AmazonKinesisException gracefully in Kinesis Streaming Connector > --- > > Key: FLINK-5355 > URL: https://issues.apache.org/jira/browse/FLINK-5355 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.2.0, 1.1.3 >Reporter: Scott Kidder >Assignee: Scott Kidder > > My Flink job that consumes from a Kinesis stream must be restarted at least > once daily due to an uncaught AmazonKinesisException when reading from > Kinesis. The complete stacktrace looks like: > {noformat} > com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: > AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: > dc1b7a1a-1b97-1a32-8cd5-79a896a55223) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583) > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193) > at > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268) > at > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > It's interesting that the Kinesis endpoint returned a 500 status code, but > that's
[jira] [Commented] (FLINK-5355) Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
[ https://issues.apache.org/jira/browse/FLINK-5355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823231#comment-15823231 ] ASF GitHub Bot commented on FLINK-5355: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3078#discussion_r96145771 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.AmazonServiceException.ErrorType; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; + +/** + * Test for methods in the KinesisProxy class. + * --- End diff -- Extra empty line > Handle AmazonKinesisException gracefully in Kinesis Streaming Connector > --- > > Key: FLINK-5355 > URL: https://issues.apache.org/jira/browse/FLINK-5355 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.2.0, 1.1.3 >Reporter: Scott Kidder >Assignee: Scott Kidder > > My Flink job that consumes from a Kinesis stream must be restarted at least > once daily due to an uncaught AmazonKinesisException when reading from > Kinesis. The complete stacktrace looks like: > {noformat} > com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: > AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: > dc1b7a1a-1b97-1a32-8cd5-79a896a55223) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583) > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193) > at > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268) > at > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > It's interesting that the Kinesis endpoint returned a 500 status code, but > that's outside the scope of this issue. > I
[jira] [Commented] (FLINK-5355) Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
[ https://issues.apache.org/jira/browse/FLINK-5355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823232#comment-15823232 ] ASF GitHub Bot commented on FLINK-5355: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3078#discussion_r96145761 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp return getShardIteratorResult.getShardIterator(); } + /** +* Determines whether the exception can be recovered from using +* exponential-backoff +* +* @param ex +*Exception to inspect +* @return true if the exception can be recovered from, else +* false +*/ + protected static boolean isRecoverableException(AmazonServiceException ex) { + if (ex.getErrorType() == null) { + return false; + } + + switch (ex.getErrorType()) { + case Client: + if (ex instanceof ProvisionedThroughputExceededException) { --- End diff -- It'll probably be cleaner to just do `ex instanceof ProvisionedThroughputExceededException ` > Handle AmazonKinesisException gracefully in Kinesis Streaming Connector > --- > > Key: FLINK-5355 > URL: https://issues.apache.org/jira/browse/FLINK-5355 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.2.0, 1.1.3 >Reporter: Scott Kidder >Assignee: Scott Kidder > > My Flink job that consumes from a Kinesis stream must be restarted at least > once daily due to an uncaught AmazonKinesisException when reading from > Kinesis. The complete stacktrace looks like: > {noformat} > com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: > AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: > dc1b7a1a-1b97-1a32-8cd5-79a896a55223) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583) > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193) > at > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268) > at > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > It's interesting that the Kinesis endpoint returned a 500 status code, but > that's outside the scope of this issue. > I think we can handle this exception in the same manner as a > ProvisionedThroughputException: performing an exponential backoff and > retrying a finite number of times before throwing an exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5355) Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
[ https://issues.apache.org/jira/browse/FLINK-5355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823234#comment-15823234 ] ASF GitHub Bot commented on FLINK-5355: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3078#discussion_r96145754 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp return getShardIteratorResult.getShardIterator(); } + /** +* Determines whether the exception can be recovered from using +* exponential-backoff +* +* @param ex +*Exception to inspect +* @return true if the exception can be recovered from, else +* false +*/ + protected static boolean isRecoverableException(AmazonServiceException ex) { + if (ex.getErrorType() == null) { + return false; + } + + switch (ex.getErrorType()) { --- End diff -- The indentation for the cases here seem to be missing. > Handle AmazonKinesisException gracefully in Kinesis Streaming Connector > --- > > Key: FLINK-5355 > URL: https://issues.apache.org/jira/browse/FLINK-5355 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.2.0, 1.1.3 >Reporter: Scott Kidder >Assignee: Scott Kidder > > My Flink job that consumes from a Kinesis stream must be restarted at least > once daily due to an uncaught AmazonKinesisException when reading from > Kinesis. The complete stacktrace looks like: > {noformat} > com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: > AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: > dc1b7a1a-1b97-1a32-8cd5-79a896a55223) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583) > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193) > at > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268) > at > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > It's interesting that the Kinesis endpoint returned a 500 status code, but > that's outside the scope of this issue. > I think we can handle this exception in the same manner as a > ProvisionedThroughputException: performing an exponential backoff and > retrying a finite number of times before throwing an exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3078#discussion_r96145784 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.AmazonServiceException.ErrorType; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; + +/** + * Test for methods in the KinesisProxy class. --- End diff -- Should link the `KinesisProxy` referencing, like other Javadocs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3078#discussion_r96145735 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp return getShardIteratorResult.getShardIterator(); } + /** +* Determines whether the exception can be recovered from using +* exponential-backoff +* +* @param ex +*Exception to inspect --- End diff -- nit: I think the Javadoc formatting here is inconsistent with the other methods (line change). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3078#discussion_r96145771 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.AmazonServiceException.ErrorType; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; + +/** + * Test for methods in the KinesisProxy class. + * --- End diff -- Extra empty line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3078#discussion_r96145754 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp return getShardIteratorResult.getShardIterator(); } + /** +* Determines whether the exception can be recovered from using +* exponential-backoff +* +* @param ex +*Exception to inspect +* @return true if the exception can be recovered from, else +* false +*/ + protected static boolean isRecoverableException(AmazonServiceException ex) { + if (ex.getErrorType() == null) { + return false; + } + + switch (ex.getErrorType()) { --- End diff -- The indentation for the cases here seem to be missing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3078#discussion_r96145761 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp return getShardIteratorResult.getShardIterator(); } + /** +* Determines whether the exception can be recovered from using +* exponential-backoff +* +* @param ex +*Exception to inspect +* @return true if the exception can be recovered from, else +* false +*/ + protected static boolean isRecoverableException(AmazonServiceException ex) { + if (ex.getErrorType() == null) { + return false; + } + + switch (ex.getErrorType()) { + case Client: + if (ex instanceof ProvisionedThroughputExceededException) { --- End diff -- It'll probably be cleaner to just do `ex instanceof ProvisionedThroughputExceededException ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3078#discussion_r96145793 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import static org.junit.Assert.*; --- End diff -- In Flink we generally try to avoid asterisk imports. The style check doesn't actually check the test scope, but it'll be good to try to be consistent with the style rules in tests also. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5496) ClassCastException when using Mesos HA mode
Till Rohrmann created FLINK-5496: Summary: ClassCastException when using Mesos HA mode Key: FLINK-5496 URL: https://issues.apache.org/jira/browse/FLINK-5496 Project: Flink Issue Type: Bug Components: Mesos Affects Versions: 1.2.0, 1.3.0 Reporter: Till Rohrmann Priority: Critical Fix For: 1.2.0, 1.3.0 When using the Mesos' HA mode, one cannot start the Mesos appmaster, because the following class cast exception occurs: {code} java.lang.ClassCastException: org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl cannot be cast to org.apache.flink.mesos.shaded.org.apache.curator.framework.CuratorFramework at org.apache.flink.mesos.util.ZooKeeperUtils.startCuratorFramework(ZooKeeperUtils.java:38) at org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner.createWorkerStore(MesosApplicationMasterRunner.java:510) at org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner.runPrivileged(MesosApplicationMasterRunner.java:320) at org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner$1.call(MesosApplicationMasterRunner.java:178) at org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner$1.call(MesosApplicationMasterRunner.java:175) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29) at org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner.run(MesosApplicationMasterRunner.java:175) at org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner.main(MesosApplicationMasterRunner.java:135) {code} It seems as if the {{flink-mesos}} module relocates the curator dependency in another namespace than {{flink-runtime}}. Not sure why this is done. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5495) ZooKeeperMesosWorkerStore cannot be instantiated
Till Rohrmann created FLINK-5495: Summary: ZooKeeperMesosWorkerStore cannot be instantiated Key: FLINK-5495 URL: https://issues.apache.org/jira/browse/FLINK-5495 Project: Flink Issue Type: Bug Components: Mesos Affects Versions: 1.2.0 Reporter: Till Rohrmann Priority: Critical The {{ZooKeeperMesosWorkerStore}} cannot be instantiated because it dynamically instantiates a {{ZooKeeperStateHandleStore}} without providing an {{Executor}} to the constructor. This effectively breaks Mesos HA mode. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5494) Improve Mesos documentation
Till Rohrmann created FLINK-5494: Summary: Improve Mesos documentation Key: FLINK-5494 URL: https://issues.apache.org/jira/browse/FLINK-5494 Project: Flink Issue Type: Sub-task Components: Mesos Affects Versions: 1.2.0 Reporter: Till Rohrmann Fix For: 1.2.0 Flink's Mesos documentation could benefit from more details how to set things up and which parameters to use. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5493) FlinkDistributionOverlay does not properly display missing environment variables
Till Rohrmann created FLINK-5493: Summary: FlinkDistributionOverlay does not properly display missing environment variables Key: FLINK-5493 URL: https://issues.apache.org/jira/browse/FLINK-5493 Project: Flink Issue Type: Bug Components: Cluster Management Affects Versions: 1.2.0, 1.3.0 Reporter: Till Rohrmann Priority: Minor Fix For: 1.3.0 The class {{FlinkDistributionOverlay}} does not properly log missing environment variables in case of an error. This should be changed so that the user knows which variables he has to set. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5489) maven release:prepare fails due to invalid JDOM comments in pom.xml
[ https://issues.apache.org/jira/browse/FLINK-5489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-5489: -- Component/s: Build System > maven release:prepare fails due to invalid JDOM comments in pom.xml > --- > > Key: FLINK-5489 > URL: https://issues.apache.org/jira/browse/FLINK-5489 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.2.0, 1.3.0 >Reporter: Haohui Mai >Assignee: Haohui Mai >Priority: Minor > Labels: newbie > Fix For: 1.3.0 > > > When I was trying to publish Flink to our internal artifactory, I found out > that {{maven release:prepare}} has failed because the plugin complains about > the some of the comments pom.xml do not conform with the JDOM format: > {noformat} > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-release-plugin:2.4.2:prepare (default-cli) on > project flink-parent: Execution default-cli of goal > org.apache.maven.plugins:maven-release-plugin:2.4.2:prepare failed: The data > "- > [ERROR] This module is used a dependency in the root pom. It activates > shading for all sub modules > [ERROR] through an include rule in the shading configuration. This assures > that Maven always generates > [ERROR] an effective pom for all modules, i.e. get rid of Maven properties. > In particular, this is needed > [ERROR] to define the Scala version property in the root pom but not let the > root pom depend on Scala > [ERROR] and thus be suffixed along with all other modules. > [ERROR] " is not legal for a JDOM comment: Comment data cannot start with a > hyphen. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5489) maven release:prepare fails due to invalid JDOM comments in pom.xml
[ https://issues.apache.org/jira/browse/FLINK-5489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823170#comment-15823170 ] ASF GitHub Bot commented on FLINK-5489: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3123 > maven release:prepare fails due to invalid JDOM comments in pom.xml > --- > > Key: FLINK-5489 > URL: https://issues.apache.org/jira/browse/FLINK-5489 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.2.0, 1.3.0 >Reporter: Haohui Mai >Assignee: Haohui Mai >Priority: Minor > Labels: newbie > Fix For: 1.3.0 > > > When I was trying to publish Flink to our internal artifactory, I found out > that {{maven release:prepare}} has failed because the plugin complains about > the some of the comments pom.xml do not conform with the JDOM format: > {noformat} > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-release-plugin:2.4.2:prepare (default-cli) on > project flink-parent: Execution default-cli of goal > org.apache.maven.plugins:maven-release-plugin:2.4.2:prepare failed: The data > "- > [ERROR] This module is used a dependency in the root pom. It activates > shading for all sub modules > [ERROR] through an include rule in the shading configuration. This assures > that Maven always generates > [ERROR] an effective pom for all modules, i.e. get rid of Maven properties. > In particular, this is needed > [ERROR] to define the Scala version property in the root pom but not let the > root pom depend on Scala > [ERROR] and thus be suffixed along with all other modules. > [ERROR] " is not legal for a JDOM comment: Comment data cannot start with a > hyphen. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-5489) maven release:prepare fails due to invalid JDOM comments in pom.xml
[ https://issues.apache.org/jira/browse/FLINK-5489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-5489. --- Resolution: Fixed Fix Version/s: 1.3.0 Thank you for fixing this! Resolved in master with commit http://git-wip-us.apache.org/repos/asf/flink/commit/e2ba042c > maven release:prepare fails due to invalid JDOM comments in pom.xml > --- > > Key: FLINK-5489 > URL: https://issues.apache.org/jira/browse/FLINK-5489 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.2.0, 1.3.0 >Reporter: Haohui Mai >Assignee: Haohui Mai >Priority: Minor > Labels: newbie > Fix For: 1.3.0 > > > When I was trying to publish Flink to our internal artifactory, I found out > that {{maven release:prepare}} has failed because the plugin complains about > the some of the comments pom.xml do not conform with the JDOM format: > {noformat} > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-release-plugin:2.4.2:prepare (default-cli) on > project flink-parent: Execution default-cli of goal > org.apache.maven.plugins:maven-release-plugin:2.4.2:prepare failed: The data > "- > [ERROR] This module is used a dependency in the root pom. It activates > shading for all sub modules > [ERROR] through an include rule in the shading configuration. This assures > that Maven always generates > [ERROR] an effective pom for all modules, i.e. get rid of Maven properties. > In particular, this is needed > [ERROR] to define the Scala version property in the root pom but not let the > root pom depend on Scala > [ERROR] and thus be suffixed along with all other modules. > [ERROR] " is not legal for a JDOM comment: Comment data cannot start with a > hyphen. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3123: [FLINK-5489] maven release:prepare fails due to in...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3123 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5489) maven release:prepare fails due to invalid JDOM comments in pom.xml
[ https://issues.apache.org/jira/browse/FLINK-5489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823168#comment-15823168 ] ASF GitHub Bot commented on FLINK-5489: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3123 Good change. Thank you. I'll merge it right away > maven release:prepare fails due to invalid JDOM comments in pom.xml > --- > > Key: FLINK-5489 > URL: https://issues.apache.org/jira/browse/FLINK-5489 > Project: Flink > Issue Type: Bug >Affects Versions: 1.2.0, 1.3.0 >Reporter: Haohui Mai >Assignee: Haohui Mai >Priority: Minor > Labels: newbie > > When I was trying to publish Flink to our internal artifactory, I found out > that {{maven release:prepare}} has failed because the plugin complains about > the some of the comments pom.xml do not conform with the JDOM format: > {noformat} > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-release-plugin:2.4.2:prepare (default-cli) on > project flink-parent: Execution default-cli of goal > org.apache.maven.plugins:maven-release-plugin:2.4.2:prepare failed: The data > "- > [ERROR] This module is used a dependency in the root pom. It activates > shading for all sub modules > [ERROR] through an include rule in the shading configuration. This assures > that Maven always generates > [ERROR] an effective pom for all modules, i.e. get rid of Maven properties. > In particular, this is needed > [ERROR] to define the Scala version property in the root pom but not let the > root pom depend on Scala > [ERROR] and thus be suffixed along with all other modules. > [ERROR] " is not legal for a JDOM comment: Comment data cannot start with a > hyphen. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3123: [FLINK-5489] maven release:prepare fails due to invalid J...
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3123 Good change. Thank you. I'll merge it right away --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5380) Number of outgoing records not reported in web interface
[ https://issues.apache.org/jira/browse/FLINK-5380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823165#comment-15823165 ] ASF GitHub Bot commented on FLINK-5380: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3068 +1 to merge ![image](https://cloud.githubusercontent.com/assets/89049/21963634/948c5988-db3e-11e6-8641-6089521e9d87.png) > Number of outgoing records not reported in web interface > > > Key: FLINK-5380 > URL: https://issues.apache.org/jira/browse/FLINK-5380 > Project: Flink > Issue Type: Bug > Components: Metrics, Streaming, Webfrontend >Affects Versions: 1.2.0, 1.3.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.2.0, 1.3.0 > > Attachments: outRecordsNotreported.png > > > The web frontend does not report any outgoing records in the web frontend. > The amount of data in MB is reported correctly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #3068: [FLINK-5380] Fix task metrics reuse for single-operator c...
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3068 +1 to merge ![image](https://cloud.githubusercontent.com/assets/89049/21963634/948c5988-db3e-11e6-8641-6089521e9d87.png) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5491) Document default settings for yarn cluster mode
[ https://issues.apache.org/jira/browse/FLINK-5491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-5491: - Description: When starting a per job cluster with {{flink run -m yarn-cluster}}, then it is possible to configure different settings such as job manager memory, task manager memory and the number of slots, for example. All of these settings have a default value which are nowhere documented. I think it would be helpful to show the default values when calling {{flink run -h}} and also to document them online. Some of the default values seem to be defined in {{AbstractYarnClusterDescriptor}}. was: When starting a per job cluster with {{flink run -m yarn-cluster}}, then it is possible to configure different settings such as job manager memory, task manager memory and the number of slots, for example. All of these settings have a default value which are nowhere documented. I think it would be helpful to show the default values when calling {{flink run -h}} and also to document them online. > Document default settings for yarn cluster mode > --- > > Key: FLINK-5491 > URL: https://issues.apache.org/jira/browse/FLINK-5491 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Till Rohrmann >Priority: Minor > Fix For: 1.2.0, 1.3.0 > > > When starting a per job cluster with {{flink run -m yarn-cluster}}, then it > is possible to configure different settings such as job manager memory, task > manager memory and the number of slots, for example. > All of these settings have a default value which are nowhere documented. I > think it would be helpful to show the default values when calling {{flink run > -h}} and also to document them online. > Some of the default values seem to be defined in > {{AbstractYarnClusterDescriptor}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5492) BootstrapTools log wrong address of started ActorSystem
Till Rohrmann created FLINK-5492: Summary: BootstrapTools log wrong address of started ActorSystem Key: FLINK-5492 URL: https://issues.apache.org/jira/browse/FLINK-5492 Project: Flink Issue Type: Bug Reporter: Till Rohrmann Priority: Minor When starting an {{ActorSystem}} via the {{Bootstrap}} tools, then the {{startActorSystem}} function logs the IP resolved from the provided hostname as the {{ActorSystem}} address. However, then the function uses the unresolved hostname to start the {{ActorSystem}}. Since Akka matches the ActorSystem's address and the destination address of the incoming message we should log the URL which is used to start the {{ActorSystem}} and not the resolved IP (messages with the IP will usually be rejected). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5491) Document default settings for yarn cluster mode
Till Rohrmann created FLINK-5491: Summary: Document default settings for yarn cluster mode Key: FLINK-5491 URL: https://issues.apache.org/jira/browse/FLINK-5491 Project: Flink Issue Type: Sub-task Reporter: Till Rohrmann Priority: Minor When starting a per job cluster with {{flink run -m yarn-cluster}}, then it is possible to configure different settings such as job manager memory, task manager memory and the number of slots, for example. All of these settings have a default value which are nowhere documented. I think it would be helpful to show the default values when calling {{flink run -h}} and also to document them online. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Deleted] (FLINK-5475) Extend DC/OS documentation
[ https://issues.apache.org/jira/browse/FLINK-5475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann deleted FLINK-5475: - > Extend DC/OS documentation > -- > > Key: FLINK-5475 > URL: https://issues.apache.org/jira/browse/FLINK-5475 > Project: Flink > Issue Type: Sub-task >Reporter: Till Rohrmann >Priority: Minor > > We could extend the DC/OS documentation a little bit to include information > about how to submit a job (where to find the connection information) and that > one has to install the DC/OS cli in order to add the development universe. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #3124: [FLINK-5281] Extend KafkaJsonTableSources to suppo...
GitHub user mushketyk opened a pull request: https://github.com/apache/flink/pull/3124 [FLINK-5281] Extend KafkaJsonTableSources to support nested data Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed I've added support for serialization and deserialization of nested Rows in Json serializer/deserializer. I tried not to change interfaces, but I wonder if would make sense to replace all `String[] fieldNames, TypeInformation[] fieldTypes` pairs with `RowTypeInfo`. This will change the interface (but we are doing this anyway) but will make interfaces clearer and will help to avoid some code duplication. @fhueske, what do you think about this? You can merge this pull request into a Git repository by running: $ git pull https://github.com/mushketyk/flink json-nested-table-source Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3124.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3124 commit 3463919b94dd11796ad71d4983644322be38a209 Author: Ivan MushketykDate: 2017-01-14T14:33:51Z [FLINK-5281] Extend KafkaJsonTableSources to support nested data --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5281) Extend KafkaJsonTableSources to support nested data
[ https://issues.apache.org/jira/browse/FLINK-5281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823098#comment-15823098 ] ASF GitHub Bot commented on FLINK-5281: --- GitHub user mushketyk opened a pull request: https://github.com/apache/flink/pull/3124 [FLINK-5281] Extend KafkaJsonTableSources to support nested data Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed I've added support for serialization and deserialization of nested Rows in Json serializer/deserializer. I tried not to change interfaces, but I wonder if would make sense to replace all `String[] fieldNames, TypeInformation[] fieldTypes` pairs with `RowTypeInfo`. This will change the interface (but we are doing this anyway) but will make interfaces clearer and will help to avoid some code duplication. @fhueske, what do you think about this? You can merge this pull request into a Git repository by running: $ git pull https://github.com/mushketyk/flink json-nested-table-source Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3124.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3124 commit 3463919b94dd11796ad71d4983644322be38a209 Author: Ivan MushketykDate: 2017-01-14T14:33:51Z [FLINK-5281] Extend KafkaJsonTableSources to support nested data > Extend KafkaJsonTableSources to support nested data > --- > > Key: FLINK-5281 > URL: https://issues.apache.org/jira/browse/FLINK-5281 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > The {{TableSource}} does currently not support nested data. > Once FLINK-5280 is fixed, the KafkaJsonTableSources should be extended to > support nested input data. The nested data should be produced as {{Row}}s > nested in {{Row}}s. -- This message was sent by Atlassian JIRA (v6.3.4#6332)