[GitHub] flink issue #3081: [FLINK-5426] Clean up the Flink Machine Learning library

2017-01-15 Thread Fokko
Github user Fokko commented on the issue:

https://github.com/apache/flink/pull/3081
  
I've rebased both branches with master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5426) Clean up the Flink Machine Learning library

2017-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823579#comment-15823579
 ] 

ASF GitHub Bot commented on FLINK-5426:
---

Github user Fokko commented on the issue:

https://github.com/apache/flink/pull/3081
  
I've rebased both branches with master.


> Clean up the Flink Machine Learning library
> ---
>
> Key: FLINK-5426
> URL: https://issues.apache.org/jira/browse/FLINK-5426
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library
>Reporter: Fokko Driesprong
>Assignee: Fokko Driesprong
>
> Hi Guys,
> I would like to clean up the Machine Learning library. A lot of the code in 
> the ML Library does not conform to the original contribution guide. For 
> example:
> Duplicate tests, different names, but exactly the same testcase:
> https://github.com/apache/flink/blob/master/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala#L148
> https://github.com/apache/flink/blob/master/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala#L164
> Lot of multi-line tests-cases:
> https://github.com/Fokko/flink/blob/master/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala
> Mis-use of constants:
> https://github.com/apache/flink/blob/master/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseMatrix.scala#L58
> Please allow me to clean this up, and I'm looking forward to contribute more 
> code, especially to the ML part. I've have been a contributor to Apache Spark 
> and am happy to extend the codebase with new distributed algorithms and make 
> the codebase more mature.
> Cheers, Fokko



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5498) Fix JoinITCase and add support for filter expressions on the On clause in left/right outer joins

2017-01-15 Thread lincoln.lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln.lee updated FLINK-5498:
---
Description: 
I found the expected result of a unit test case incorrect compare to that in a 
RDMBS, 
see 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
{code:title=JoinITCase.scala}
def testRightJoinWithNotOnlyEquiJoin(): Unit = {
 ...
 val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'c)
 val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)

 val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
 
 val expected = "Hello world,BCD\n"
 val results = joinT.toDataSet[Row].collect()
 TestBaseUtils.compareResultAsText(results.asJava, expected)
}
{code}

Then I took some time to learn about the ‘outer join’ in relational databases, 
the right result of above case should be(tested in SQL Server and MySQL, the 
results are same):
{code}
> select c, g from tuple3 right outer join tuple5 on a=f and b select c, g from tuple3 right outer join tuple5 on a=f and b

[jira] [Commented] (FLINK-5426) Clean up the Flink Machine Learning library

2017-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823577#comment-15823577
 ] 

ASF GitHub Bot commented on FLINK-5426:
---

Github user Fokko commented on the issue:

https://github.com/apache/flink/pull/3081
  
Hi @tillrohrmann, did you find any time to check #3077 and this PR?


> Clean up the Flink Machine Learning library
> ---
>
> Key: FLINK-5426
> URL: https://issues.apache.org/jira/browse/FLINK-5426
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library
>Reporter: Fokko Driesprong
>Assignee: Fokko Driesprong
>
> Hi Guys,
> I would like to clean up the Machine Learning library. A lot of the code in 
> the ML Library does not conform to the original contribution guide. For 
> example:
> Duplicate tests, different names, but exactly the same testcase:
> https://github.com/apache/flink/blob/master/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala#L148
> https://github.com/apache/flink/blob/master/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala#L164
> Lot of multi-line tests-cases:
> https://github.com/Fokko/flink/blob/master/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala
> Mis-use of constants:
> https://github.com/apache/flink/blob/master/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseMatrix.scala#L58
> Please allow me to clean this up, and I'm looking forward to contribute more 
> code, especially to the ML part. I've have been a contributor to Apache Spark 
> and am happy to extend the codebase with new distributed algorithms and make 
> the codebase more mature.
> Cheers, Fokko



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3081: [FLINK-5426] Clean up the Flink Machine Learning library

2017-01-15 Thread Fokko
Github user Fokko commented on the issue:

https://github.com/apache/flink/pull/3081
  
Hi @tillrohrmann, did you find any time to check #3077 and this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-5499) Try to reuse the resource location of prior execution attempt in allocating slot

2017-01-15 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang reassigned FLINK-5499:


Assignee: Zhijiang Wang

> Try to reuse the resource location of prior execution attempt in allocating 
> slot
> 
>
> Key: FLINK-5499
> URL: https://issues.apache.org/jira/browse/FLINK-5499
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> Currently when schedule execution to request to allocate slot from 
> {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. So 
> for task fail over scenario, the new execution attempt may be deployed to 
> different task managers. If setting rockDB as state backend, the performance 
> is better if the data can be restored from local machine. So we try to reuse 
> the {{TaskManagerLocation}} of prior execution attempt when allocating slot 
> from {{SlotPool}}. If the {{TaskManagerLocation}} is empty from prior 
> executions, the behavior is the same with current status.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5499) Try to reuse the resource location of prior execution attempt in allocating slot

2017-01-15 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5499:
-
Description: Currently when schedule execution to request to allocate slot 
from {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. 
So for task fail over scenario, the new execution attempt may be deployed to 
different task managers. If setting rockDB as state backend, the performance is 
better if the data can be restored from local machine. So we try to reuse the 
{{TaskManagerLocation}} of prior execution attempt when allocating slot from 
{{SlotPool}}. If the {{TaskManagerLocation}} is empty from prior executions, 
the behavior is the same with current status.  (was: Currently when schedule 
execution to request to allocate slot from {{SlotPool}}, the 
{{TaskManagerLocation}} parameter is empty collection. So for task fail over 
scenario, the new execution attempt may be deployed to different task managers. 
If setting rockDB as state backend, the performance is better if the data can 
be restored from local machine. So we try to reuse the TaskManagerLocation of 
prior execution attempt when allocating slot from SlotPool. If the 
TaskManagerLocation is empty from prior executions, the behavior is the same 
with current status.)

> Try to reuse the resource location of prior execution attempt in allocating 
> slot
> 
>
> Key: FLINK-5499
> URL: https://issues.apache.org/jira/browse/FLINK-5499
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: Zhijiang Wang
>
> Currently when schedule execution to request to allocate slot from 
> {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. So 
> for task fail over scenario, the new execution attempt may be deployed to 
> different task managers. If setting rockDB as state backend, the performance 
> is better if the data can be restored from local machine. So we try to reuse 
> the {{TaskManagerLocation}} of prior execution attempt when allocating slot 
> from {{SlotPool}}. If the {{TaskManagerLocation}} is empty from prior 
> executions, the behavior is the same with current status.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5499) Try to reuse the resource location of prior execution attempt in allocating slot

2017-01-15 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5499:
-
Description: Currently when schedule execution to request to allocate slot 
from {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. 
So for task fail over scenario, the new execution attempt may be deployed to 
different task managers. If setting rockDB as state backend, the performance is 
better if the data can be restored from local machine. So we try to reuse the 
TaskManagerLocation of prior execution attempt when allocating slot from 
SlotPool. If the TaskManagerLocation is empty from prior executions, the 
behavior is the same with current status.  (was: Currently when schedule 
execution to request to allocate slot from SlotPool, the TaskManagerLocation 
parameter is empty collection. So for task fail over scenario, the new 
execution attempt may be deployed to different task managers. If setting rockDB 
as state backend, the performance is better if the data can be restored from 
local machine. So we try to reuse the TaskManagerLocation of prior execution 
attempt when allocating slot from SlotPool. If the TaskManagerLocation is empty 
from prior executions, the behavior is the same with current status.)

> Try to reuse the resource location of prior execution attempt in allocating 
> slot
> 
>
> Key: FLINK-5499
> URL: https://issues.apache.org/jira/browse/FLINK-5499
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: Zhijiang Wang
>
> Currently when schedule execution to request to allocate slot from 
> {{SlotPool}}, the {{TaskManagerLocation}} parameter is empty collection. So 
> for task fail over scenario, the new execution attempt may be deployed to 
> different task managers. If setting rockDB as state backend, the performance 
> is better if the data can be restored from local machine. So we try to reuse 
> the TaskManagerLocation of prior execution attempt when allocating slot from 
> SlotPool. If the TaskManagerLocation is empty from prior executions, the 
> behavior is the same with current status.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5498) Fix JoinITCase and add support for filter expressions on the On clause in left/right outer joins

2017-01-15 Thread lincoln.lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln.lee updated FLINK-5498:
---
Description: 
I found the expected result of a unit test case incorrect compare to that in a 
RDMBS, 
see 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
{code:title=JoinITCase.scala}
def testRightJoinWithNotOnlyEquiJoin(): Unit = {
 ...
 val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'c)
 val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)

 val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
 
 val expected = "Hello world,BCD\n"
 val results = joinT.toDataSet[Row].collect()
 TestBaseUtils.compareResultAsText(results.asJava, expected)
}
{code}

Then I took some time to learn about the ‘outer join’ in relational databases, 
the right result of above case should be(tested in SQL Server and MySQL, the 
results are same):
{code}
> select c, g from tuple3 right outer join tuple5 on a=f and b select c, g from tuple3 right outer join tuple5 on a=f and b

[jira] [Updated] (FLINK-5499) Try to reuse the resource location of prior execution attempt in allocating slot

2017-01-15 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5499:
-
Summary: Try to reuse the resource location of prior execution attempt in 
allocating slot  (was: Try to reuse the resource location of prior execution 
attempt when allocate slot)

> Try to reuse the resource location of prior execution attempt in allocating 
> slot
> 
>
> Key: FLINK-5499
> URL: https://issues.apache.org/jira/browse/FLINK-5499
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: Zhijiang Wang
>
> Currently when schedule execution to request to allocate slot from SlotPool, 
> the TaskManagerLocation parameter is empty collection. So for task fail over 
> scenario, the new execution attempt may be deployed to different task 
> managers. If setting rockDB as state backend, the performance is better if 
> the data can be restored from local machine. So we try to reuse the 
> TaskManagerLocation of prior execution attempt when allocating slot from 
> SlotPool. If the TaskManagerLocation is empty from prior executions, the 
> behavior is the same with current status.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5499) Try to reuse the resource location of prior execution attempt when allocate slot

2017-01-15 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5499:
-
Summary: Try to reuse the resource location of prior execution attempt when 
allocate slot  (was: Try to reuse the resource location of previous execution 
attempt when allocate slot)

> Try to reuse the resource location of prior execution attempt when allocate 
> slot
> 
>
> Key: FLINK-5499
> URL: https://issues.apache.org/jira/browse/FLINK-5499
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: Zhijiang Wang
>
> Currently when schedule execution to request to allocate slot from SlotPool, 
> the TaskManagerLocation parameter is empty collection. So for task fail over 
> scenario, the new execution attempt may be deployed to different task 
> managers. If setting rockDB as state backend, the performance is better if 
> the data can be restored from local machine. So we try to reuse the 
> TaskManagerLocation of prior execution attempt when allocating slot from 
> SlotPool. If the TaskManagerLocation is empty from prior executions, the 
> behavior is the same with current status.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5498) Fix JoinITCase and add support for filter expressions on the On clause in left/right outer joins

2017-01-15 Thread lincoln.lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln.lee updated FLINK-5498:
---
Description: 
I found the expected result of a unit test case incorrect compare to that in a 
RDMBS, 
see 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
{code:title=JoinITCase.scala}
def testRightJoinWithNotOnlyEquiJoin(): Unit = {
 ...
 val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'c)
 val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)

 val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
 
 val expected = "Hello world,BCD\n"
 val results = joinT.toDataSet[Row].collect()
 TestBaseUtils.compareResultAsText(results.asJava, expected)
}
{code}

Then I took some time to learn about the ‘outer join’ in relational databases, 
the right result of above case should be(tested in SQL Server and MySQL, the 
results are same):
{code}
> select c, g from tuple3 right outer join tuple5 on a=f and b select c, g from tuple3 right outer join tuple5 on a=f and b

[jira] [Created] (FLINK-5499) Try to reuse the resource location of previous execution attempt when allocate slot

2017-01-15 Thread Zhijiang Wang (JIRA)
Zhijiang Wang created FLINK-5499:


 Summary: Try to reuse the resource location of previous execution 
attempt when allocate slot
 Key: FLINK-5499
 URL: https://issues.apache.org/jira/browse/FLINK-5499
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Reporter: Zhijiang Wang


Currently when schedule execution to request to allocate slot from SlotPool, 
the TaskManagerLocation parameter is empty collection. So for task fail over 
scenario, the new execution attempt may be deployed to different task managers. 
If setting rockDB as state backend, the performance is better if the data can 
be restored from local machine. So we try to reuse the TaskManagerLocation of 
prior execution attempt when allocating slot from SlotPool. If the 
TaskManagerLocation is empty from prior executions, the behavior is the same 
with current status.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5498) Fix JoinITCase and add support for filter expressions on the On clause in left/right outer joins

2017-01-15 Thread lincoln.lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln.lee updated FLINK-5498:
---
Description: 
I found the expected result of a unit test case incorrect compare to that in a 
RDMBS, 
see 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
{code:title=JoinITCase.scala}
def testRightJoinWithNotOnlyEquiJoin(): Unit = {
 ...
 val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'c)
 val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)

 val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
 
 val expected = "Hello world,BCD\n"
 val results = joinT.toDataSet[Row].collect()
 TestBaseUtils.compareResultAsText(results.asJava, expected)
}
{code}

Then I took some time to learn about the ‘outer join’ in relational databases, 
the right result of above case should be(tested in SQL Server and MySQL, the 
results are same):
{panel}
> select c, g from tuple3 right outer join tuple5 on a=f and b select c, g from tuple3 right outer join tuple5 on a=f and b

[jira] [Created] (FLINK-5498) Fix JoinITCase and add support for filter expressions on the On clause in left/right outer joins

2017-01-15 Thread lincoln.lee (JIRA)
lincoln.lee created FLINK-5498:
--

 Summary: Fix JoinITCase and add support for filter expressions on 
the On clause in left/right outer joins
 Key: FLINK-5498
 URL: https://issues.apache.org/jira/browse/FLINK-5498
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.1.4
Reporter: lincoln.lee
Assignee: lincoln.lee






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3475) DISTINCT aggregate function support for SQL queries

2017-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823450#comment-15823450
 ] 

ASF GitHub Bot commented on FLINK-3475:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3111#discussion_r96160082
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala
 ---
@@ -165,9 +165,13 @@ class AggregationTest extends TableTestBase {
 val util = batchTestUtil()
 val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 
'c)
 
-val resultTable = sourceTable.groupBy('a)
+// Move "where" before "groupBy" for the former query would generate
+// nondeterministic plans with same cost. If we change 
FlinkRuleSets.DATASET_OPT_RULES,
+// the importance of relNode may change, and the test may fail. This 
issue is mentioned
+// in FLINK-5394, we could move "where" to the end when FLINK-5394 is 
fixed.
+val resultTable = sourceTable.where('a === 1).groupBy('a)
--- End diff --

It might make sense to wait with this until #3058 is in. It is almost done 
I think.


> DISTINCT aggregate function support for SQL queries
> ---
>
> Key: FLINK-3475
> URL: https://issues.apache.org/jira/browse/FLINK-3475
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Chengxiang Li
>Assignee: Zhenghua Gao
>
> DISTINCT aggregate function may be able to reuse the aggregate function 
> instead of separate implementation, and let Flink runtime take care of 
> duplicate records.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3475) DISTINCT aggregate function support for SQL queries

2017-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823451#comment-15823451
 ] 

ASF GitHub Bot commented on FLINK-3475:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3111#discussion_r96160329
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java
 ---
@@ -0,0 +1,1144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet;
+
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.fun.SqlCountAggFunction;
+import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.fun.SqlSumAggFunction;
+import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Planner rule that expands distinct aggregates
+ * (such as {@code COUNT(DISTINCT x)}) from a
+ * {@link org.apache.calcite.rel.logical.LogicalAggregate}.
+ *
+ * How this is done depends upon the arguments to the function. If all
+ * functions have the same argument
+ * (e.g. {@code COUNT(DISTINCT x), SUM(DISTINCT x)} both have the argument
+ * {@code x}) then one extra {@link org.apache.calcite.rel.core.Aggregate} 
is
+ * sufficient.
+ *
+ * If there are multiple arguments
+ * (e.g. {@code COUNT(DISTINCT x), COUNT(DISTINCT y)})
+ * the rule creates separate {@code Aggregate}s and combines using a
+ * {@link org.apache.calcite.rel.core.Join}.
+ */
+public final class FlinkAggregateExpandDistinctAggregatesRule extends 
RelOptRule {
--- End diff --

I would like to move this class to `org.apache.flink.table.calcite` 
package, and add a comment to the top of the class to annotate this is a 
temporary solution and should be removed later, such as 

>This is a copy of Calcite's [[AggregateExpandDistinctAggregatesRule]] with 
a quick fix to avoid some bad case mentioned in CALCITE-1558. Should drop it 
and use calcite's [[AggregateExpandDistinctAggregatesRule]] when upgrade to 
calcite 1.12 (above).



> DISTINCT aggregate function support for SQL queries
> ---
>
> Key: FLINK-3475
> 

[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

2017-01-15 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3111#discussion_r96160329
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/FlinkAggregateExpandDistinctAggregatesRule.java
 ---
@@ -0,0 +1,1144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet;
+
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.fun.SqlCountAggFunction;
+import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.fun.SqlSumAggFunction;
+import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Planner rule that expands distinct aggregates
+ * (such as {@code COUNT(DISTINCT x)}) from a
+ * {@link org.apache.calcite.rel.logical.LogicalAggregate}.
+ *
+ * How this is done depends upon the arguments to the function. If all
+ * functions have the same argument
+ * (e.g. {@code COUNT(DISTINCT x), SUM(DISTINCT x)} both have the argument
+ * {@code x}) then one extra {@link org.apache.calcite.rel.core.Aggregate} 
is
+ * sufficient.
+ *
+ * If there are multiple arguments
+ * (e.g. {@code COUNT(DISTINCT x), COUNT(DISTINCT y)})
+ * the rule creates separate {@code Aggregate}s and combines using a
+ * {@link org.apache.calcite.rel.core.Join}.
+ */
+public final class FlinkAggregateExpandDistinctAggregatesRule extends 
RelOptRule {
--- End diff --

I would like to move this class to `org.apache.flink.table.calcite` 
package, and add a comment to the top of the class to annotate this is a 
temporary solution and should be removed later, such as 

>This is a copy of Calcite's [[AggregateExpandDistinctAggregatesRule]] with 
a quick fix to avoid some bad case mentioned in CALCITE-1558. Should drop it 
and use calcite's [[AggregateExpandDistinctAggregatesRule]] when upgrade to 
calcite 1.12 (above).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

2017-01-15 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3111#discussion_r96160082
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala
 ---
@@ -165,9 +165,13 @@ class AggregationTest extends TableTestBase {
 val util = batchTestUtil()
 val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 
'c)
 
-val resultTable = sourceTable.groupBy('a)
+// Move "where" before "groupBy" for the former query would generate
+// nondeterministic plans with same cost. If we change 
FlinkRuleSets.DATASET_OPT_RULES,
+// the importance of relNode may change, and the test may fail. This 
issue is mentioned
+// in FLINK-5394, we could move "where" to the end when FLINK-5394 is 
fixed.
+val resultTable = sourceTable.where('a === 1).groupBy('a)
--- End diff --

It might make sense to wait with this until #3058 is in. It is almost done 
I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4930) Implement FLIP-6 YARN client

2017-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823397#comment-15823397
 ] 

ASF GitHub Bot commented on FLINK-4930:
---

Github user shuai-xu closed the pull request at:

https://github.com/apache/flink/pull/2935


> Implement FLIP-6 YARN client
> 
>
> Key: FLINK-4930
> URL: https://issues.apache.org/jira/browse/FLINK-4930
> Project: Flink
>  Issue Type: Sub-task
>  Components: YARN
> Environment: {{flip-6}} feature branch
>Reporter: Stephan Ewen
>Assignee: shuai.xu
>
> The FLIP-6 YARN client can follow parts of the existing YARN client.
> The main difference is that it does not wait for the cluster to be fully 
> started and for all TaskManagers to register. It simply submits 
>   - Set up all configurations and environment variables
>   - Set up the resources: Flink jar, utility jars (logging), user jar
>   - Set up attached tokens / certificates
>   - Submit the Yarn application
>   - Listen for leader / attempt to connect to the JobManager to subscribe to 
> updates
>   - Integration with the Flink CLI (command line interface)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2935: [FLINK-4930] [client, yarn] Implement FLIP-6 YARN ...

2017-01-15 Thread shuai-xu
Github user shuai-xu closed the pull request at:

https://github.com/apache/flink/pull/2935


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5414) Bump up Calcite version to 1.11

2017-01-15 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823395#comment-15823395
 ] 

Jark Wu commented on FLINK-5414:


Calcite 1.11 has been released. I will start to work on this issue in these 
days.

> Bump up Calcite version to 1.11
> ---
>
> Key: FLINK-5414
> URL: https://issues.apache.org/jira/browse/FLINK-5414
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
>
> The upcoming Calcite release 1.11 has a lot of stability fixes and new 
> features. We should update it for the Table API.
> E.g. we can hopefully merge FLINK-4864



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5118) Inconsistent records sent/received metrics

2017-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823272#comment-15823272
 ] 

ASF GitHub Bot commented on FLINK-5118:
---

Github user xhumanoid commented on the issue:

https://github.com/apache/flink/pull/3106
  
@zentol I asked because you always check on null when you try writing to 
Counter 
or is it prevent uninitialized state?


> Inconsistent records sent/received metrics
> --
>
> Key: FLINK-5118
> URL: https://issues.apache.org/jira/browse/FLINK-5118
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Webfrontend
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Ufuk Celebi
>Assignee: Chesnay Schepler
> Fix For: 1.2.0, 1.3.0
>
>
> In 1.2-SNAPSHOT running a large scale job you see that the counts for 
> send/received records are inconsistent, e.g. in a simple word count job we 
> see more received records/bytes than we see sent. This is a regression from 
> 1.1 where everything works as expected.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3106: [FLINK-5118] Fix inconsistent numBytesIn/Out metrics

2017-01-15 Thread xhumanoid
Github user xhumanoid commented on the issue:

https://github.com/apache/flink/pull/3106
  
@zentol I asked because you always check on null when you try writing to 
Counter 
or is it prevent uninitialized state?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5118) Inconsistent records sent/received metrics

2017-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823265#comment-15823265
 ] 

ASF GitHub Bot commented on FLINK-5118:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3106
  
@xhumanoid The metrics returned by the TaskIOMetricGroup can't actually be 
null, so I wouldn't put too much thought into dealing with that case.


> Inconsistent records sent/received metrics
> --
>
> Key: FLINK-5118
> URL: https://issues.apache.org/jira/browse/FLINK-5118
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Webfrontend
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Ufuk Celebi
>Assignee: Chesnay Schepler
> Fix For: 1.2.0, 1.3.0
>
>
> In 1.2-SNAPSHOT running a large scale job you see that the counts for 
> send/received records are inconsistent, e.g. in a simple word count job we 
> see more received records/bytes than we see sent. This is a regression from 
> 1.1 where everything works as expected.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3106: [FLINK-5118] Fix inconsistent numBytesIn/Out metrics

2017-01-15 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3106
  
@xhumanoid The metrics returned by the TaskIOMetricGroup can't actually be 
null, so I wouldn't put too much thought into dealing with that case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5497) remove duplicated tests

2017-01-15 Thread Alexey Diomin (JIRA)
Alexey Diomin created FLINK-5497:


 Summary: remove duplicated tests
 Key: FLINK-5497
 URL: https://issues.apache.org/jira/browse/FLINK-5497
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Reporter: Alexey Diomin
Priority: Minor


Now we have test which run the same code 4 times, every run 17+ seconds.

Need do small refactoring and remove duplicated code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5118) Inconsistent records sent/received metrics

2017-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823250#comment-15823250
 ] 

ASF GitHub Bot commented on FLINK-5118:
---

Github user xhumanoid commented on the issue:

https://github.com/apache/flink/pull/3106
  
@zentol 
what do you think about remove 
if (numBytesOut != null) {

and replace
 numBytesOut = metrics.getNumBytesOutCounter();

with

+ if (metrics.getNumBytesOutCounter() != null) {
+numBytesOut = metrics.getNumBytesOutCounter();
+ } else {
+numBytesOut = new NullCounter();
+ }

where NullCounter have empty implementation for every method,

prof:
we do null check in one place, because sometime we may forget to do it

cons:
sometimes we broke devirtualization and inlining for counter.inc(..) method


> Inconsistent records sent/received metrics
> --
>
> Key: FLINK-5118
> URL: https://issues.apache.org/jira/browse/FLINK-5118
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Webfrontend
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Ufuk Celebi
>Assignee: Chesnay Schepler
> Fix For: 1.2.0, 1.3.0
>
>
> In 1.2-SNAPSHOT running a large scale job you see that the counts for 
> send/received records are inconsistent, e.g. in a simple word count job we 
> see more received records/bytes than we see sent. This is a regression from 
> 1.1 where everything works as expected.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3106: [FLINK-5118] Fix inconsistent numBytesIn/Out metrics

2017-01-15 Thread xhumanoid
Github user xhumanoid commented on the issue:

https://github.com/apache/flink/pull/3106
  
@zentol 
what do you think about remove 
if (numBytesOut != null) {

and replace
 numBytesOut = metrics.getNumBytesOutCounter();

with

+ if (metrics.getNumBytesOutCounter() != null) {
+numBytesOut = metrics.getNumBytesOutCounter();
+ } else {
+numBytesOut = new NullCounter();
+ }

where NullCounter have empty implementation for every method,

prof:
we do null check in one place, because sometime we may forget to do it

cons:
sometimes we broke devirtualization and inlining for counter.inc(..) method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5355) Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

2017-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823235#comment-15823235
 ] 

ASF GitHub Bot commented on FLINK-5355:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3078#discussion_r96145735
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard 
shard, String shardIteratorTyp
return getShardIteratorResult.getShardIterator();
}
 
+   /**
+* Determines whether the exception can be recovered from using
+* exponential-backoff
+* 
+* @param ex
+*Exception to inspect
--- End diff --

nit: I think the Javadoc formatting here is inconsistent with the other 
methods (line change).


> Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
> ---
>
> Key: FLINK-5355
> URL: https://issues.apache.org/jira/browse/FLINK-5355
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Scott Kidder
>Assignee: Scott Kidder
>
> My Flink job that consumes from a Kinesis stream must be restarted at least 
> once daily due to an uncaught AmazonKinesisException when reading from 
> Kinesis. The complete stacktrace looks like:
> {noformat}
> com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
> AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
> dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
>   at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
>   at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> It's interesting that the Kinesis endpoint returned a 500 status code, but 
> that's outside the scope of this issue.
> I think we can handle this exception in the same manner as a 
> ProvisionedThroughputException: performing an exponential backoff and 
> retrying a finite number of times before throwing an exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5355) Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

2017-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823236#comment-15823236
 ] 

ASF GitHub Bot commented on FLINK-5355:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3078#discussion_r96145793
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import static org.junit.Assert.*;
--- End diff --

In Flink we generally try to avoid asterisk imports.
The style check doesn't actually check the test scope, but it'll be good to 
try to be consistent with the style rules in tests also.


> Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
> ---
>
> Key: FLINK-5355
> URL: https://issues.apache.org/jira/browse/FLINK-5355
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Scott Kidder
>Assignee: Scott Kidder
>
> My Flink job that consumes from a Kinesis stream must be restarted at least 
> once daily due to an uncaught AmazonKinesisException when reading from 
> Kinesis. The complete stacktrace looks like:
> {noformat}
> com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
> AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
> dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
>   at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
>   at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> It's interesting that the Kinesis endpoint returned a 500 status code, but 
> that's outside the scope of this issue.
> I think we can handle this exception in the same manner as a 
> ProvisionedThroughputException: performing an exponential backoff and 
> retrying a finite number of times before throwing an exception.



--
This message was 

[jira] [Commented] (FLINK-5355) Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

2017-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823233#comment-15823233
 ] 

ASF GitHub Bot commented on FLINK-5355:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3078#discussion_r96145784
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.AmazonServiceException.ErrorType;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+
+/**
+ * Test for methods in the KinesisProxy class.
--- End diff --

Should link the `KinesisProxy` referencing, like other Javadocs.


> Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
> ---
>
> Key: FLINK-5355
> URL: https://issues.apache.org/jira/browse/FLINK-5355
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Scott Kidder
>Assignee: Scott Kidder
>
> My Flink job that consumes from a Kinesis stream must be restarted at least 
> once daily due to an uncaught AmazonKinesisException when reading from 
> Kinesis. The complete stacktrace looks like:
> {noformat}
> com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
> AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
> dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
>   at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
>   at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> It's interesting that the Kinesis endpoint returned a 500 status code, but 
> that's 

[jira] [Commented] (FLINK-5355) Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

2017-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823231#comment-15823231
 ] 

ASF GitHub Bot commented on FLINK-5355:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3078#discussion_r96145771
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.AmazonServiceException.ErrorType;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+
+/**
+ * Test for methods in the KinesisProxy class.
+ * 
--- End diff --

Extra empty line


> Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
> ---
>
> Key: FLINK-5355
> URL: https://issues.apache.org/jira/browse/FLINK-5355
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Scott Kidder
>Assignee: Scott Kidder
>
> My Flink job that consumes from a Kinesis stream must be restarted at least 
> once daily due to an uncaught AmazonKinesisException when reading from 
> Kinesis. The complete stacktrace looks like:
> {noformat}
> com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
> AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
> dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
>   at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
>   at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> It's interesting that the Kinesis endpoint returned a 500 status code, but 
> that's outside the scope of this issue.
> I 

[jira] [Commented] (FLINK-5355) Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

2017-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823232#comment-15823232
 ] 

ASF GitHub Bot commented on FLINK-5355:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3078#discussion_r96145761
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard 
shard, String shardIteratorTyp
return getShardIteratorResult.getShardIterator();
}
 
+   /**
+* Determines whether the exception can be recovered from using
+* exponential-backoff
+* 
+* @param ex
+*Exception to inspect
+* @return true if the exception can be recovered from, 
else
+* false
+*/
+   protected static boolean isRecoverableException(AmazonServiceException 
ex) {
+   if (ex.getErrorType() == null) {
+   return false;
+   }
+
+   switch (ex.getErrorType()) {
+   case Client:
+   if (ex instanceof 
ProvisionedThroughputExceededException) {
--- End diff --

It'll probably be cleaner to just do `ex instanceof 
ProvisionedThroughputExceededException `


> Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
> ---
>
> Key: FLINK-5355
> URL: https://issues.apache.org/jira/browse/FLINK-5355
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Scott Kidder
>Assignee: Scott Kidder
>
> My Flink job that consumes from a Kinesis stream must be restarted at least 
> once daily due to an uncaught AmazonKinesisException when reading from 
> Kinesis. The complete stacktrace looks like:
> {noformat}
> com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
> AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
> dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
>   at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
>   at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> It's interesting that the Kinesis endpoint returned a 500 status code, but 
> that's outside the scope of this issue.
> I think we can handle this exception in the same manner as a 
> ProvisionedThroughputException: performing an exponential backoff and 
> retrying a finite number of times before throwing an exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5355) Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

2017-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823234#comment-15823234
 ] 

ASF GitHub Bot commented on FLINK-5355:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3078#discussion_r96145754
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard 
shard, String shardIteratorTyp
return getShardIteratorResult.getShardIterator();
}
 
+   /**
+* Determines whether the exception can be recovered from using
+* exponential-backoff
+* 
+* @param ex
+*Exception to inspect
+* @return true if the exception can be recovered from, 
else
+* false
+*/
+   protected static boolean isRecoverableException(AmazonServiceException 
ex) {
+   if (ex.getErrorType() == null) {
+   return false;
+   }
+
+   switch (ex.getErrorType()) {
--- End diff --

The indentation for the cases here seem to be missing.


> Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
> ---
>
> Key: FLINK-5355
> URL: https://issues.apache.org/jira/browse/FLINK-5355
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Scott Kidder
>Assignee: Scott Kidder
>
> My Flink job that consumes from a Kinesis stream must be restarted at least 
> once daily due to an uncaught AmazonKinesisException when reading from 
> Kinesis. The complete stacktrace looks like:
> {noformat}
> com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: 
> AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: 
> dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
>   at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
>   at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
>   at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
>   at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> It's interesting that the Kinesis endpoint returned a 500 status code, but 
> that's outside the scope of this issue.
> I think we can handle this exception in the same manner as a 
> ProvisionedThroughputException: performing an exponential backoff and 
> retrying a finite number of times before throwing an exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

2017-01-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3078#discussion_r96145784
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.AmazonServiceException.ErrorType;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+
+/**
+ * Test for methods in the KinesisProxy class.
--- End diff --

Should link the `KinesisProxy` referencing, like other Javadocs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

2017-01-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3078#discussion_r96145735
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard 
shard, String shardIteratorTyp
return getShardIteratorResult.getShardIterator();
}
 
+   /**
+* Determines whether the exception can be recovered from using
+* exponential-backoff
+* 
+* @param ex
+*Exception to inspect
--- End diff --

nit: I think the Javadoc formatting here is inconsistent with the other 
methods (line change).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

2017-01-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3078#discussion_r96145771
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.AmazonServiceException.ErrorType;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+
+/**
+ * Test for methods in the KinesisProxy class.
+ * 
--- End diff --

Extra empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

2017-01-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3078#discussion_r96145754
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard 
shard, String shardIteratorTyp
return getShardIteratorResult.getShardIterator();
}
 
+   /**
+* Determines whether the exception can be recovered from using
+* exponential-backoff
+* 
+* @param ex
+*Exception to inspect
+* @return true if the exception can be recovered from, 
else
+* false
+*/
+   protected static boolean isRecoverableException(AmazonServiceException 
ex) {
+   if (ex.getErrorType() == null) {
+   return false;
+   }
+
+   switch (ex.getErrorType()) {
--- End diff --

The indentation for the cases here seem to be missing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

2017-01-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3078#discussion_r96145761
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -253,6 +262,35 @@ public String getShardIterator(KinesisStreamShard 
shard, String shardIteratorTyp
return getShardIteratorResult.getShardIterator();
}
 
+   /**
+* Determines whether the exception can be recovered from using
+* exponential-backoff
+* 
+* @param ex
+*Exception to inspect
+* @return true if the exception can be recovered from, 
else
+* false
+*/
+   protected static boolean isRecoverableException(AmazonServiceException 
ex) {
+   if (ex.getErrorType() == null) {
+   return false;
+   }
+
+   switch (ex.getErrorType()) {
+   case Client:
+   if (ex instanceof 
ProvisionedThroughputExceededException) {
--- End diff --

It'll probably be cleaner to just do `ex instanceof 
ProvisionedThroughputExceededException `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3078: [FLINK-5355] Handle AmazonKinesisException gracefu...

2017-01-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3078#discussion_r96145793
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import static org.junit.Assert.*;
--- End diff --

In Flink we generally try to avoid asterisk imports.
The style check doesn't actually check the test scope, but it'll be good to 
try to be consistent with the style rules in tests also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5496) ClassCastException when using Mesos HA mode

2017-01-15 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-5496:


 Summary: ClassCastException when using Mesos HA mode
 Key: FLINK-5496
 URL: https://issues.apache.org/jira/browse/FLINK-5496
 Project: Flink
  Issue Type: Bug
  Components: Mesos
Affects Versions: 1.2.0, 1.3.0
Reporter: Till Rohrmann
Priority: Critical
 Fix For: 1.2.0, 1.3.0


When using the Mesos' HA mode, one cannot start the Mesos appmaster, because 
the following class cast exception occurs:

{code}
java.lang.ClassCastException: 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl 
cannot be cast to 
org.apache.flink.mesos.shaded.org.apache.curator.framework.CuratorFramework
at 
org.apache.flink.mesos.util.ZooKeeperUtils.startCuratorFramework(ZooKeeperUtils.java:38)
at 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner.createWorkerStore(MesosApplicationMasterRunner.java:510)
at 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner.runPrivileged(MesosApplicationMasterRunner.java:320)
at 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner$1.call(MesosApplicationMasterRunner.java:178)
at 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner$1.call(MesosApplicationMasterRunner.java:175)
at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29)
at 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner.run(MesosApplicationMasterRunner.java:175)
at 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner.main(MesosApplicationMasterRunner.java:135)
{code}

It seems as if the {{flink-mesos}} module relocates the curator dependency in 
another namespace than {{flink-runtime}}. Not sure why this is done. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5495) ZooKeeperMesosWorkerStore cannot be instantiated

2017-01-15 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-5495:


 Summary: ZooKeeperMesosWorkerStore cannot be instantiated
 Key: FLINK-5495
 URL: https://issues.apache.org/jira/browse/FLINK-5495
 Project: Flink
  Issue Type: Bug
  Components: Mesos
Affects Versions: 1.2.0
Reporter: Till Rohrmann
Priority: Critical


The {{ZooKeeperMesosWorkerStore}} cannot be instantiated because it dynamically 
instantiates a {{ZooKeeperStateHandleStore}} without providing an {{Executor}} 
to the constructor. This effectively breaks Mesos HA mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5494) Improve Mesos documentation

2017-01-15 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-5494:


 Summary: Improve Mesos documentation
 Key: FLINK-5494
 URL: https://issues.apache.org/jira/browse/FLINK-5494
 Project: Flink
  Issue Type: Sub-task
  Components: Mesos
Affects Versions: 1.2.0
Reporter: Till Rohrmann
 Fix For: 1.2.0


Flink's Mesos documentation could benefit from more details how to set things 
up and which parameters to use.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5493) FlinkDistributionOverlay does not properly display missing environment variables

2017-01-15 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-5493:


 Summary: FlinkDistributionOverlay does not properly display 
missing environment variables
 Key: FLINK-5493
 URL: https://issues.apache.org/jira/browse/FLINK-5493
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Affects Versions: 1.2.0, 1.3.0
Reporter: Till Rohrmann
Priority: Minor
 Fix For: 1.3.0


The class {{FlinkDistributionOverlay}} does not properly log missing 
environment variables in case of an error. This should be changed so that the 
user knows which variables he has to set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5489) maven release:prepare fails due to invalid JDOM comments in pom.xml

2017-01-15 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-5489:
--
Component/s: Build System

> maven release:prepare fails due to invalid JDOM comments in pom.xml
> ---
>
> Key: FLINK-5489
> URL: https://issues.apache.org/jira/browse/FLINK-5489
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>Priority: Minor
>  Labels: newbie
> Fix For: 1.3.0
>
>
> When I was trying to publish Flink to our internal artifactory, I found out 
> that {{maven release:prepare}} has failed because the plugin complains about 
> the some of the comments pom.xml do not conform with the JDOM format:
> {noformat}
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-release-plugin:2.4.2:prepare (default-cli) on 
> project flink-parent: Execution default-cli of goal 
> org.apache.maven.plugins:maven-release-plugin:2.4.2:prepare failed: The data 
> "-
> [ERROR] This module is used a dependency in the root pom. It activates 
> shading for all sub modules
> [ERROR] through an include rule in the shading configuration. This assures 
> that Maven always generates
> [ERROR] an effective pom for all modules, i.e. get rid of Maven properties. 
> In particular, this is needed
> [ERROR] to define the Scala version property in the root pom but not let the 
> root pom depend on Scala
> [ERROR] and thus be suffixed along with all other modules.
> [ERROR] " is not legal for a JDOM comment: Comment data cannot start with a 
> hyphen.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5489) maven release:prepare fails due to invalid JDOM comments in pom.xml

2017-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823170#comment-15823170
 ] 

ASF GitHub Bot commented on FLINK-5489:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3123


> maven release:prepare fails due to invalid JDOM comments in pom.xml
> ---
>
> Key: FLINK-5489
> URL: https://issues.apache.org/jira/browse/FLINK-5489
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>Priority: Minor
>  Labels: newbie
> Fix For: 1.3.0
>
>
> When I was trying to publish Flink to our internal artifactory, I found out 
> that {{maven release:prepare}} has failed because the plugin complains about 
> the some of the comments pom.xml do not conform with the JDOM format:
> {noformat}
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-release-plugin:2.4.2:prepare (default-cli) on 
> project flink-parent: Execution default-cli of goal 
> org.apache.maven.plugins:maven-release-plugin:2.4.2:prepare failed: The data 
> "-
> [ERROR] This module is used a dependency in the root pom. It activates 
> shading for all sub modules
> [ERROR] through an include rule in the shading configuration. This assures 
> that Maven always generates
> [ERROR] an effective pom for all modules, i.e. get rid of Maven properties. 
> In particular, this is needed
> [ERROR] to define the Scala version property in the root pom but not let the 
> root pom depend on Scala
> [ERROR] and thus be suffixed along with all other modules.
> [ERROR] " is not legal for a JDOM comment: Comment data cannot start with a 
> hyphen.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-5489) maven release:prepare fails due to invalid JDOM comments in pom.xml

2017-01-15 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger resolved FLINK-5489.
---
   Resolution: Fixed
Fix Version/s: 1.3.0

Thank you for fixing this!

Resolved in master with commit 
http://git-wip-us.apache.org/repos/asf/flink/commit/e2ba042c

> maven release:prepare fails due to invalid JDOM comments in pom.xml
> ---
>
> Key: FLINK-5489
> URL: https://issues.apache.org/jira/browse/FLINK-5489
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>Priority: Minor
>  Labels: newbie
> Fix For: 1.3.0
>
>
> When I was trying to publish Flink to our internal artifactory, I found out 
> that {{maven release:prepare}} has failed because the plugin complains about 
> the some of the comments pom.xml do not conform with the JDOM format:
> {noformat}
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-release-plugin:2.4.2:prepare (default-cli) on 
> project flink-parent: Execution default-cli of goal 
> org.apache.maven.plugins:maven-release-plugin:2.4.2:prepare failed: The data 
> "-
> [ERROR] This module is used a dependency in the root pom. It activates 
> shading for all sub modules
> [ERROR] through an include rule in the shading configuration. This assures 
> that Maven always generates
> [ERROR] an effective pom for all modules, i.e. get rid of Maven properties. 
> In particular, this is needed
> [ERROR] to define the Scala version property in the root pom but not let the 
> root pom depend on Scala
> [ERROR] and thus be suffixed along with all other modules.
> [ERROR] " is not legal for a JDOM comment: Comment data cannot start with a 
> hyphen.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3123: [FLINK-5489] maven release:prepare fails due to in...

2017-01-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3123


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5489) maven release:prepare fails due to invalid JDOM comments in pom.xml

2017-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823168#comment-15823168
 ] 

ASF GitHub Bot commented on FLINK-5489:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3123
  
Good change. Thank you.

I'll merge it right away


> maven release:prepare fails due to invalid JDOM comments in pom.xml
> ---
>
> Key: FLINK-5489
> URL: https://issues.apache.org/jira/browse/FLINK-5489
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>Priority: Minor
>  Labels: newbie
>
> When I was trying to publish Flink to our internal artifactory, I found out 
> that {{maven release:prepare}} has failed because the plugin complains about 
> the some of the comments pom.xml do not conform with the JDOM format:
> {noformat}
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-release-plugin:2.4.2:prepare (default-cli) on 
> project flink-parent: Execution default-cli of goal 
> org.apache.maven.plugins:maven-release-plugin:2.4.2:prepare failed: The data 
> "-
> [ERROR] This module is used a dependency in the root pom. It activates 
> shading for all sub modules
> [ERROR] through an include rule in the shading configuration. This assures 
> that Maven always generates
> [ERROR] an effective pom for all modules, i.e. get rid of Maven properties. 
> In particular, this is needed
> [ERROR] to define the Scala version property in the root pom but not let the 
> root pom depend on Scala
> [ERROR] and thus be suffixed along with all other modules.
> [ERROR] " is not legal for a JDOM comment: Comment data cannot start with a 
> hyphen.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3123: [FLINK-5489] maven release:prepare fails due to invalid J...

2017-01-15 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3123
  
Good change. Thank you.

I'll merge it right away


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5380) Number of outgoing records not reported in web interface

2017-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823165#comment-15823165
 ] 

ASF GitHub Bot commented on FLINK-5380:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3068
  
+1 to merge


![image](https://cloud.githubusercontent.com/assets/89049/21963634/948c5988-db3e-11e6-8641-6089521e9d87.png)



> Number of outgoing records not reported in web interface
> 
>
> Key: FLINK-5380
> URL: https://issues.apache.org/jira/browse/FLINK-5380
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Streaming, Webfrontend
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.2.0, 1.3.0
>
> Attachments: outRecordsNotreported.png
>
>
> The web frontend does not report any outgoing records in the web frontend.
> The amount of data in MB is reported correctly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3068: [FLINK-5380] Fix task metrics reuse for single-operator c...

2017-01-15 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3068
  
+1 to merge


![image](https://cloud.githubusercontent.com/assets/89049/21963634/948c5988-db3e-11e6-8641-6089521e9d87.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5491) Document default settings for yarn cluster mode

2017-01-15 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-5491:
-
Description: 
When starting a per job cluster with {{flink run -m yarn-cluster}}, then it is 
possible to configure different settings such as job manager memory, task 
manager memory and the number of slots, for example. 

All of these settings have a default value which are nowhere documented. I 
think it would be helpful to show the default values when calling {{flink run 
-h}} and also to document them online.

Some of the default values seem to be defined in 
{{AbstractYarnClusterDescriptor}}.

  was:
When starting a per job cluster with {{flink run -m yarn-cluster}}, then it is 
possible to configure different settings such as job manager memory, task 
manager memory and the number of slots, for example. 

All of these settings have a default value which are nowhere documented. I 
think it would be helpful to show the default values when calling {{flink run 
-h}} and also to document them online.


> Document default settings for yarn cluster mode
> ---
>
> Key: FLINK-5491
> URL: https://issues.apache.org/jira/browse/FLINK-5491
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Till Rohrmann
>Priority: Minor
> Fix For: 1.2.0, 1.3.0
>
>
> When starting a per job cluster with {{flink run -m yarn-cluster}}, then it 
> is possible to configure different settings such as job manager memory, task 
> manager memory and the number of slots, for example. 
> All of these settings have a default value which are nowhere documented. I 
> think it would be helpful to show the default values when calling {{flink run 
> -h}} and also to document them online.
> Some of the default values seem to be defined in 
> {{AbstractYarnClusterDescriptor}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5492) BootstrapTools log wrong address of started ActorSystem

2017-01-15 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-5492:


 Summary: BootstrapTools log wrong address of started ActorSystem
 Key: FLINK-5492
 URL: https://issues.apache.org/jira/browse/FLINK-5492
 Project: Flink
  Issue Type: Bug
Reporter: Till Rohrmann
Priority: Minor


When starting an {{ActorSystem}} via the {{Bootstrap}} tools, then the 
{{startActorSystem}} function logs the IP resolved from the provided hostname 
as the {{ActorSystem}} address. However, then the function uses the unresolved 
hostname to start the {{ActorSystem}}. Since Akka matches the ActorSystem's 
address and the destination address of the incoming message we should log the 
URL which is used to start the {{ActorSystem}} and not the resolved IP 
(messages with the IP will usually be rejected).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5491) Document default settings for yarn cluster mode

2017-01-15 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-5491:


 Summary: Document default settings for yarn cluster mode
 Key: FLINK-5491
 URL: https://issues.apache.org/jira/browse/FLINK-5491
 Project: Flink
  Issue Type: Sub-task
Reporter: Till Rohrmann
Priority: Minor


When starting a per job cluster with {{flink run -m yarn-cluster}}, then it is 
possible to configure different settings such as job manager memory, task 
manager memory and the number of slots, for example. 

All of these settings have a default value which are nowhere documented. I 
think it would be helpful to show the default values when calling {{flink run 
-h}} and also to document them online.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Deleted] (FLINK-5475) Extend DC/OS documentation

2017-01-15 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann deleted FLINK-5475:
-


> Extend DC/OS documentation
> --
>
> Key: FLINK-5475
> URL: https://issues.apache.org/jira/browse/FLINK-5475
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Till Rohrmann
>Priority: Minor
>
> We could extend the DC/OS documentation a little bit to include information 
> about how to submit a job (where to find the connection information) and that 
> one has to install the DC/OS cli in order to add the development universe.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3124: [FLINK-5281] Extend KafkaJsonTableSources to suppo...

2017-01-15 Thread mushketyk
GitHub user mushketyk opened a pull request:

https://github.com/apache/flink/pull/3124

[FLINK-5281] Extend KafkaJsonTableSources to support nested data

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


I've added support for serialization and deserialization of nested Rows in 
Json serializer/deserializer. I tried not to change interfaces, but I wonder if 
would make sense to replace all `String[] fieldNames, TypeInformation[] 
fieldTypes` pairs with `RowTypeInfo`. This will change the interface (but we 
are doing this anyway) but will make interfaces clearer and will help to avoid 
some code duplication.
@fhueske, what do you think about this?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mushketyk/flink json-nested-table-source

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3124.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3124


commit 3463919b94dd11796ad71d4983644322be38a209
Author: Ivan Mushketyk 
Date:   2017-01-14T14:33:51Z

[FLINK-5281] Extend KafkaJsonTableSources to support nested data




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5281) Extend KafkaJsonTableSources to support nested data

2017-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823098#comment-15823098
 ] 

ASF GitHub Bot commented on FLINK-5281:
---

GitHub user mushketyk opened a pull request:

https://github.com/apache/flink/pull/3124

[FLINK-5281] Extend KafkaJsonTableSources to support nested data

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


I've added support for serialization and deserialization of nested Rows in 
Json serializer/deserializer. I tried not to change interfaces, but I wonder if 
would make sense to replace all `String[] fieldNames, TypeInformation[] 
fieldTypes` pairs with `RowTypeInfo`. This will change the interface (but we 
are doing this anyway) but will make interfaces clearer and will help to avoid 
some code duplication.
@fhueske, what do you think about this?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mushketyk/flink json-nested-table-source

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3124.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3124


commit 3463919b94dd11796ad71d4983644322be38a209
Author: Ivan Mushketyk 
Date:   2017-01-14T14:33:51Z

[FLINK-5281] Extend KafkaJsonTableSources to support nested data




> Extend KafkaJsonTableSources to support nested data
> ---
>
> Key: FLINK-5281
> URL: https://issues.apache.org/jira/browse/FLINK-5281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} does currently not support nested data. 
> Once FLINK-5280 is fixed, the KafkaJsonTableSources should be extended to 
> support nested input data. The nested data should be produced as {{Row}}s 
> nested in {{Row}}s.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)