[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927536#comment-15927536
 ] 

ASF GitHub Bot commented on FLINK-5653:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106343179
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -130,32 +142,76 @@ class DataStreamOverAggregate(
 val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
 
 val result: DataStream[Row] =
-// partitioned aggregation
-if (partitionKeys.nonEmpty) {
-  val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
-namedAggregates,
-inputType)
+  // partitioned aggregation
+  if (partitionKeys.nonEmpty) {
+val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
+  namedAggregates,
+  inputType)
 
-  inputDS
+inputDS
   .keyBy(partitionKeys: _*)
   .process(processFunction)
   .returns(rowTypeInfo)
   .name(aggOpName)
   .asInstanceOf[DataStream[Row]]
-}
-// non-partitioned aggregation
-else {
-  val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
-namedAggregates,
-inputType,
-false)
-
-  inputDS
-
.process(processFunction).setParallelism(1).setMaxParallelism(1)
-.returns(rowTypeInfo)
-.name(aggOpName)
-.asInstanceOf[DataStream[Row]]
-}
+  } // non-partitioned aggregation
+  else {
+val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
+  namedAggregates,
+  inputType,
+  false)
+
+inputDS
+  .process(processFunction).setParallelism(1).setMaxParallelism(1)
+  .returns(rowTypeInfo)
+  .name(aggOpName)
+  .asInstanceOf[DataStream[Row]]
+  }
+result
+  }
+
+  def createBoundedAndCurrentRowProcessingTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row] = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+// get the output types
+val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+val lowerbound: Int = AggregateUtil.getLowerBoundary(
+  logicWindow.constants,
+  overWindow.lowerBound,
+  getInput())
+
+val result: DataStream[Row] =
+  // partitioned aggregation
+  if (partitionKeys.nonEmpty) {
+val windowFunction = 
AggregateUtil.CreateBoundedProcessingOverWindowFunction(
+  namedAggregates,
+  inputType)
+inputDS
+  .keyBy(partitionKeys: _*)
+  .countWindow(lowerbound,1)
--- End diff --

`lowerbound -> (lowerbound+1)`


> Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5653
> URL: https://issues.apache.org/jira/browse/FLINK-5653
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Stefano Bortoli
>
> The goal of this issue is to add support for OVER ROWS aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5656)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are 

[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-15 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106343179
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -130,32 +142,76 @@ class DataStreamOverAggregate(
 val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
 
 val result: DataStream[Row] =
-// partitioned aggregation
-if (partitionKeys.nonEmpty) {
-  val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
-namedAggregates,
-inputType)
+  // partitioned aggregation
+  if (partitionKeys.nonEmpty) {
+val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
+  namedAggregates,
+  inputType)
 
-  inputDS
+inputDS
   .keyBy(partitionKeys: _*)
   .process(processFunction)
   .returns(rowTypeInfo)
   .name(aggOpName)
   .asInstanceOf[DataStream[Row]]
-}
-// non-partitioned aggregation
-else {
-  val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
-namedAggregates,
-inputType,
-false)
-
-  inputDS
-
.process(processFunction).setParallelism(1).setMaxParallelism(1)
-.returns(rowTypeInfo)
-.name(aggOpName)
-.asInstanceOf[DataStream[Row]]
-}
+  } // non-partitioned aggregation
+  else {
+val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
+  namedAggregates,
+  inputType,
+  false)
+
+inputDS
+  .process(processFunction).setParallelism(1).setMaxParallelism(1)
+  .returns(rowTypeInfo)
+  .name(aggOpName)
+  .asInstanceOf[DataStream[Row]]
+  }
+result
+  }
+
+  def createBoundedAndCurrentRowProcessingTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row] = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+// get the output types
+val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+val lowerbound: Int = AggregateUtil.getLowerBoundary(
+  logicWindow.constants,
+  overWindow.lowerBound,
+  getInput())
+
+val result: DataStream[Row] =
+  // partitioned aggregation
+  if (partitionKeys.nonEmpty) {
+val windowFunction = 
AggregateUtil.CreateBoundedProcessingOverWindowFunction(
+  namedAggregates,
+  inputType)
+inputDS
+  .keyBy(partitionKeys: _*)
+  .countWindow(lowerbound,1)
--- End diff --

`lowerbound -> (lowerbound+1)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927534#comment-15927534
 ] 

ASF GitHub Bot commented on FLINK-5653:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106343147
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -130,32 +142,76 @@ class DataStreamOverAggregate(
 val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
 
 val result: DataStream[Row] =
-// partitioned aggregation
-if (partitionKeys.nonEmpty) {
-  val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
-namedAggregates,
-inputType)
+  // partitioned aggregation
+  if (partitionKeys.nonEmpty) {
+val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
+  namedAggregates,
+  inputType)
 
-  inputDS
+inputDS
   .keyBy(partitionKeys: _*)
   .process(processFunction)
   .returns(rowTypeInfo)
   .name(aggOpName)
   .asInstanceOf[DataStream[Row]]
-}
-// non-partitioned aggregation
-else {
-  val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
-namedAggregates,
-inputType,
-false)
-
-  inputDS
-
.process(processFunction).setParallelism(1).setMaxParallelism(1)
-.returns(rowTypeInfo)
-.name(aggOpName)
-.asInstanceOf[DataStream[Row]]
-}
+  } // non-partitioned aggregation
+  else {
+val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
+  namedAggregates,
+  inputType,
+  false)
+
+inputDS
+  .process(processFunction).setParallelism(1).setMaxParallelism(1)
+  .returns(rowTypeInfo)
+  .name(aggOpName)
+  .asInstanceOf[DataStream[Row]]
+  }
+result
+  }
+
+  def createBoundedAndCurrentRowProcessingTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row] = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+// get the output types
+val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+val lowerbound: Int = AggregateUtil.getLowerBoundary(
+  logicWindow.constants,
+  overWindow.lowerBound,
+  getInput())
+
+val result: DataStream[Row] =
+  // partitioned aggregation
+  if (partitionKeys.nonEmpty) {
+val windowFunction = 
AggregateUtil.CreateBoundedProcessingOverWindowFunction(
+  namedAggregates,
+  inputType)
+inputDS
+  .keyBy(partitionKeys: _*)
+  .countWindow(lowerbound,1)
+  .apply(windowFunction)
+  .returns(rowTypeInfo)
+  .name(aggOpName)
+  .asInstanceOf[DataStream[Row]]
+  } // global non-partitioned aggregation
+  else {
+val windowFunction = 
AggregateUtil.CreateBoundedProcessingOverGlobalWindowFunction(
+  namedAggregates,
+  inputType)
+
+inputDS
+  .countWindowAll(lowerbound,1)
--- End diff --

`lowerbound -> (lowerbound+1)`


> Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5653
> URL: https://issues.apache.org/jira/browse/FLINK-5653
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Stefano Bortoli
>
> The goal of this issue is to add support for OVER ROWS aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> 

[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-15 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106343147
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -130,32 +142,76 @@ class DataStreamOverAggregate(
 val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
 
 val result: DataStream[Row] =
-// partitioned aggregation
-if (partitionKeys.nonEmpty) {
-  val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
-namedAggregates,
-inputType)
+  // partitioned aggregation
+  if (partitionKeys.nonEmpty) {
+val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
+  namedAggregates,
+  inputType)
 
-  inputDS
+inputDS
   .keyBy(partitionKeys: _*)
   .process(processFunction)
   .returns(rowTypeInfo)
   .name(aggOpName)
   .asInstanceOf[DataStream[Row]]
-}
-// non-partitioned aggregation
-else {
-  val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
-namedAggregates,
-inputType,
-false)
-
-  inputDS
-
.process(processFunction).setParallelism(1).setMaxParallelism(1)
-.returns(rowTypeInfo)
-.name(aggOpName)
-.asInstanceOf[DataStream[Row]]
-}
+  } // non-partitioned aggregation
+  else {
+val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
+  namedAggregates,
+  inputType,
+  false)
+
+inputDS
+  .process(processFunction).setParallelism(1).setMaxParallelism(1)
+  .returns(rowTypeInfo)
+  .name(aggOpName)
+  .asInstanceOf[DataStream[Row]]
+  }
+result
+  }
+
+  def createBoundedAndCurrentRowProcessingTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row] = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+// get the output types
+val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+val lowerbound: Int = AggregateUtil.getLowerBoundary(
+  logicWindow.constants,
+  overWindow.lowerBound,
+  getInput())
+
+val result: DataStream[Row] =
+  // partitioned aggregation
+  if (partitionKeys.nonEmpty) {
+val windowFunction = 
AggregateUtil.CreateBoundedProcessingOverWindowFunction(
+  namedAggregates,
+  inputType)
+inputDS
+  .keyBy(partitionKeys: _*)
+  .countWindow(lowerbound,1)
+  .apply(windowFunction)
+  .returns(rowTypeInfo)
+  .name(aggOpName)
+  .asInstanceOf[DataStream[Row]]
+  } // global non-partitioned aggregation
+  else {
+val windowFunction = 
AggregateUtil.CreateBoundedProcessingOverGlobalWindowFunction(
+  namedAggregates,
+  inputType)
+
+inputDS
+  .countWindowAll(lowerbound,1)
--- End diff --

`lowerbound -> (lowerbound+1)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927530#comment-15927530
 ] 

ASF GitHub Bot commented on FLINK-5653:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106342776
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.stream.sql;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.java.stream.utils.StreamTestData;
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
+import org.apache.flink.types.Row;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class ProcTimeRowStreamAggregationSqlITCase extends 
StreamingMultipleProgramsTestBase {
+
+   
+   @Test
+   public void testMaxAggregatation() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+   StreamITCase.clear();
+
+   env.setParallelism(1);
+   
+   DataStream> ds = 
StreamTestData.get5TupleDataStream(env);
+   Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e");
+   tableEnv.registerTable("MyTable", in);
+
+   String sqlQuery = "SELECT a, MAX(c) OVER (PARTITION BY a ORDER 
BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable";
+   Table result = tableEnv.sql(sqlQuery);
+
+   DataStream resultSet = tableEnv.toDataStream(result, 
Row.class);
+   resultSet.addSink(new StreamITCase.StringSink());
+   env.execute();
+
+   List expected = new ArrayList<>();
+   expected.add("1,0");
+   expected.add("2,1");
+   expected.add("2,2");
+   expected.add("3,3");
+   expected.add("3,4");
+   expected.add("3,5");
+   expected.add("4,6");
+   expected.add("4,7");
+   expected.add("4,8");
+   expected.add("4,9");
+   expected.add("5,10");
+   expected.add("5,11");
+   expected.add("5,12");
+   expected.add("5,14");
+   expected.add("5,14");
+
+   StreamITCase.compareWithList(expected);
+   }
+   
+   @Test
+   public void testMinAggregatation() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+   StreamITCase.clear();
+
+   env.setParallelism(1);
+   
+   DataStream> ds = 
StreamTestData.get5TupleDataStream(env);
+   Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e");
+   tableEnv.registerTable("MyTable", in);
+
+   String sqlQuery = "SELECT a, MIN(c) OVER (PARTITION BY a ORDER 
BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable";
+   Table result = tableEnv.sql(sqlQuery);
+
+   DataStream resultSet = 

[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-15 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106342776
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.stream.sql;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.java.stream.utils.StreamTestData;
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
+import org.apache.flink.types.Row;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class ProcTimeRowStreamAggregationSqlITCase extends 
StreamingMultipleProgramsTestBase {
+
+   
+   @Test
+   public void testMaxAggregatation() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+   StreamITCase.clear();
+
+   env.setParallelism(1);
+   
+   DataStream> ds = 
StreamTestData.get5TupleDataStream(env);
+   Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e");
+   tableEnv.registerTable("MyTable", in);
+
+   String sqlQuery = "SELECT a, MAX(c) OVER (PARTITION BY a ORDER 
BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable";
+   Table result = tableEnv.sql(sqlQuery);
+
+   DataStream resultSet = tableEnv.toDataStream(result, 
Row.class);
+   resultSet.addSink(new StreamITCase.StringSink());
+   env.execute();
+
+   List expected = new ArrayList<>();
+   expected.add("1,0");
+   expected.add("2,1");
+   expected.add("2,2");
+   expected.add("3,3");
+   expected.add("3,4");
+   expected.add("3,5");
+   expected.add("4,6");
+   expected.add("4,7");
+   expected.add("4,8");
+   expected.add("4,9");
+   expected.add("5,10");
+   expected.add("5,11");
+   expected.add("5,12");
+   expected.add("5,14");
+   expected.add("5,14");
+
+   StreamITCase.compareWithList(expected);
+   }
+   
+   @Test
+   public void testMinAggregatation() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+   StreamITCase.clear();
+
+   env.setParallelism(1);
+   
+   DataStream> ds = 
StreamTestData.get5TupleDataStream(env);
+   Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e");
+   tableEnv.registerTable("MyTable", in);
+
+   String sqlQuery = "SELECT a, MIN(c) OVER (PARTITION BY a ORDER 
BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable";
+   Table result = tableEnv.sql(sqlQuery);
+
+   DataStream resultSet = tableEnv.toDataStream(result, 
Row.class);
+   resultSet.addSink(new StreamITCase.StringSink());
+   env.execute();
+
+   List expected = new ArrayList<>();
+   expected.add("1,0");
+  

[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927520#comment-15927520
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3520
  
Hi @fhueske @godfreyhe, thanks for the review, i addressed most all your 
comments.

@fhueske Except for letting `TableSourceScan` be aware of whether filter 
has been pushed down. I'm not sure to let the `TableSourceScan` has this kind 
of information, i'd prefer to let them stay within the all kinds of 
`TableSource`. One drawback to let `TableSourceScan` has such kind of 
information is when we do the `TableSourceScan` copy, we need to take care all 
these information, make sure they also be copied correctly. In the future, if 
we add more extension to `TableSource` like we can push part of query down, we 
will face this problem. 

Regarding to the interface of `FilterableTableSource`, i agree with you 
that the trait containing some logic is not friendly with java extensions. So i 
removed the default implementation of `isFilterPushedDown`, the inherited class 
should take care of this method. And regarding the `Tuple2` thing, how about we 
pass in a mutable java list, and let table source to *pick out* expression from 
it and return a copy of table source which contains these pushed down 
predicates. 


> Add FilterableTableSource interface and translation rule
> 
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Kurt Young
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3520: [FLINK-3849] [table] Add FilterableTableSource interface ...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3520
  
Hi @fhueske @godfreyhe, thanks for the review, i addressed most all your 
comments.

@fhueske Except for letting `TableSourceScan` be aware of whether filter 
has been pushed down. I'm not sure to let the `TableSourceScan` has this kind 
of information, i'd prefer to let them stay within the all kinds of 
`TableSource`. One drawback to let `TableSourceScan` has such kind of 
information is when we do the `TableSourceScan` copy, we need to take care all 
these information, make sure they also be copied correctly. In the future, if 
we add more extension to `TableSource` like we can push part of query down, we 
will face this problem. 

Regarding to the interface of `FilterableTableSource`, i agree with you 
that the trait containing some logic is not friendly with java extensions. So i 
removed the default implementation of `isFilterPushedDown`, the inherited class 
should take care of this method. And regarding the `Tuple2` thing, how about we 
pass in a mutable java list, and let table source to *pick out* expression from 
it and return a copy of table source which contains these pushed down 
predicates. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5978) JM WebFrontend address ConfigOption is defined in ConfigConstants

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927494#comment-15927494
 ] 

ASF GitHub Bot commented on FLINK-5978:
---

GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/3552

[FLINK-5978] Fix JM WebFrontend address ConfigOption is defined in Co…

…nfigConstants

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3552.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3552


commit 6a611f5b1649af8f40c97014267a681cec1c0df9
Author: mengji.fy 
Date:   2017-03-16T04:53:03Z

[FLINK-5978] Fix JM WebFrontend address ConfigOption is defined in 
ConfigConstants




> JM WebFrontend address ConfigOption is defined in ConfigConstants
> -
>
> Key: FLINK-5978
> URL: https://issues.apache.org/jira/browse/FLINK-5978
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.3.0
>
>
> The DEFAULT_JOB_MANAGER_WEB_FRONTEND_ADDRESS ConfigOption is defined in 
> ConfigConstants instead of JobManagerOptions.
> Additionally, the name should not contain DEFAULT_ since it doesn't even 
> define a default value...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3552: [FLINK-5978] Fix JM WebFrontend address ConfigOpti...

2017-03-15 Thread zjureel
GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/3552

[FLINK-5978] Fix JM WebFrontend address ConfigOption is defined in Co…

…nfigConstants

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3552.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3552


commit 6a611f5b1649af8f40c97014267a681cec1c0df9
Author: mengji.fy 
Date:   2017-03-16T04:53:03Z

[FLINK-5978] Fix JM WebFrontend address ConfigOption is defined in 
ConfigConstants




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-5701) FlinkKafkaProducer should check asyncException on checkpoints

2017-03-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-5701.

Resolution: Fixed

Additionally fixed for 1.1.5 with 
http://git-wip-us.apache.org/repos/asf/flink/commit/  6662cc6.

> FlinkKafkaProducer should check asyncException on checkpoints
> -
>
> Key: FLINK-5701
> URL: https://issues.apache.org/jira/browse/FLINK-5701
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0, 1.1.5, 1.2.1
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each 
> invoke() and decremented on each callback, used to check if the producer 
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff 
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}} 
> (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the 
> {{snapshotState}} method both before and after flushing and 
> {{pendingRecords}} becomes 0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5701) FlinkKafkaProducer should check asyncException on checkpoints

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927479#comment-15927479
 ] 

ASF GitHub Bot commented on FLINK-5701:
---

Github user tzulitai closed the pull request at:

https://github.com/apache/flink/pull/3549


> FlinkKafkaProducer should check asyncException on checkpoints
> -
>
> Key: FLINK-5701
> URL: https://issues.apache.org/jira/browse/FLINK-5701
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0, 1.1.5, 1.2.1
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each 
> invoke() and decremented on each callback, used to check if the producer 
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff 
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}} 
> (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the 
> {{snapshotState}} method both before and after flushing and 
> {{pendingRecords}} becomes 0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3549: [backport-1.1] [FLINK-5701] [kafka] FlinkKafkaProd...

2017-03-15 Thread tzulitai
Github user tzulitai closed the pull request at:

https://github.com/apache/flink/pull/3549


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927464#comment-15927464
 ] 

ASF GitHub Bot commented on FLINK-5653:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106336720
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -19,33 +19,124 @@ package org.apache.flink.table.runtime.aggregate
 
 import java.util
 
-import org.apache.calcite.rel.`type`._
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rex.RexInputRef
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.rex.RexWindowBound
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.fun._
-import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT
+import org.apache.calcite.sql.`type`.SqlTypeName.BOOLEAN
+import org.apache.calcite.sql.`type`.SqlTypeName.DECIMAL
+import org.apache.calcite.sql.`type`.SqlTypeName.DOUBLE
+import org.apache.calcite.sql.`type`.SqlTypeName.FLOAT
+import org.apache.calcite.sql.`type`.SqlTypeName.INTEGER
+import org.apache.calcite.sql.`type`.SqlTypeName.SMALLINT
+import org.apache.calcite.sql.`type`.SqlTypeName.TINYINT
--- End diff --

Yes, I had clone your code.May be in this case,we can use `._` , 
e.g.: `import org.apache.flink.table.functions.aggfunctions._` In this way, 
we can reduce 53 line to 1 line. What do you think?


> Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5653
> URL: https://issues.apache.org/jira/browse/FLINK-5653
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Stefano Bortoli
>
> The goal of this issue is to add support for OVER ROWS aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5656)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-15 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106336720
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -19,33 +19,124 @@ package org.apache.flink.table.runtime.aggregate
 
 import java.util
 
-import org.apache.calcite.rel.`type`._
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rex.RexInputRef
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.rex.RexWindowBound
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.fun._
-import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT
+import org.apache.calcite.sql.`type`.SqlTypeName.BOOLEAN
+import org.apache.calcite.sql.`type`.SqlTypeName.DECIMAL
+import org.apache.calcite.sql.`type`.SqlTypeName.DOUBLE
+import org.apache.calcite.sql.`type`.SqlTypeName.FLOAT
+import org.apache.calcite.sql.`type`.SqlTypeName.INTEGER
+import org.apache.calcite.sql.`type`.SqlTypeName.SMALLINT
+import org.apache.calcite.sql.`type`.SqlTypeName.TINYINT
--- End diff --

Yes, I had clone your code.May be in this case,we can use `._` , 
e.g.: `import org.apache.flink.table.functions.aggfunctions._` In this way, 
we can reduce 53 line to 1 line. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927457#comment-15927457
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106335718
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.table.expressions.Expression
+
+/**
+  * Adds support for filtering push-down to a [[TableSource]].
+  * A [[TableSource]] extending this interface is able to filter records 
before returning.
+  */
+trait FilterableTableSource[T] extends TableSource[T] {
--- End diff --

Changed.


> Add FilterableTableSource interface and translation rule
> 
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Kurt Young
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106335718
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.table.expressions.Expression
+
+/**
+  * Adds support for filtering push-down to a [[TableSource]].
+  * A [[TableSource]] extending this interface is able to filter records 
before returning.
+  */
+trait FilterableTableSource[T] extends TableSource[T] {
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927456#comment-15927456
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106335709
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Expression, Literal, 
ResolvedFieldReference}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+object RexProgramExtractor {
+
+  /**
+* Extracts the indices of input fields which accessed by the 
RexProgram.
+*
+* @param rexProgram The RexProgram to analyze
+* @return The indices of accessed input fields
+*/
+  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+val visitor = new InputRefVisitor
+
+// extract referenced input fields from projections
+rexProgram.getProjectList.foreach(
+  exp => rexProgram.expandLocalRef(exp).accept(visitor))
+
+// extract referenced input fields from condition
+val condition = rexProgram.getCondition
+if (condition != null) {
+  rexProgram.expandLocalRef(condition).accept(visitor)
+}
+
+visitor.getFields
+  }
+
+  /**
+* Extract condition from RexProgram and convert it into independent 
CNF expressions.
+*
+* @param rexProgram The RexProgram to analyze
+* @return converted expressions as well as RexNodes which cannot be 
translated
+*/
+  def extractConjunctiveConditions(
+  rexProgram: RexProgram,
+  rexBuilder: RexBuilder,
+  catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = {
+
+rexProgram.getCondition match {
+  case condition: RexLocalRef =>
+val expanded = rexProgram.expandLocalRef(condition)
+// converts the expanded expression to conjunctive normal form,
+// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR 
c)"
+val cnf = RexUtil.toCnf(rexBuilder, expanded)
+// converts the cnf condition to a list of AND conditions
+val conjunctions = RelOptUtil.conjunctions(cnf)
+
+val convertedExpressions = new mutable.ArrayBuffer[Expression]
+val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode]
+val inputNames = 
rexProgram.getInputRowType.getFieldNames.asScala.toArray
+val converter = new ConvertToExpression(inputNames, catalog)
+
+conjunctions.asScala.foreach(rex => {
+  rex.accept(converter) match {
+case Some(expression) => convertedExpressions += expression
+case None => unconvertedRexNodes += rex
+  }
+})
+(convertedExpressions.toArray, unconvertedRexNodes.toArray)
+
+  case _ => (Array.empty, Array.empty)
+}
+  }
+}
+
+/**
+  * An RexVisitor to extract all referenced input fields
+  */
+class InputRefVisitor extends RexVisitorImpl[Unit](true) {
+
+  private var fields = mutable.LinkedHashSet[Int]()
+
+  def getFields: Array[Int] = fields.toArray
+
+  override def visitInputRef(inputRef: RexInputRef): 

[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927453#comment-15927453
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106335660
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Expression, Literal, 
ResolvedFieldReference}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+object RexProgramExtractor {
+
+  /**
+* Extracts the indices of input fields which accessed by the 
RexProgram.
+*
+* @param rexProgram The RexProgram to analyze
+* @return The indices of accessed input fields
+*/
+  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+val visitor = new InputRefVisitor
+
+// extract referenced input fields from projections
+rexProgram.getProjectList.foreach(
+  exp => rexProgram.expandLocalRef(exp).accept(visitor))
+
+// extract referenced input fields from condition
+val condition = rexProgram.getCondition
+if (condition != null) {
+  rexProgram.expandLocalRef(condition).accept(visitor)
+}
+
+visitor.getFields
+  }
+
+  /**
+* Extract condition from RexProgram and convert it into independent 
CNF expressions.
+*
+* @param rexProgram The RexProgram to analyze
+* @return converted expressions as well as RexNodes which cannot be 
translated
+*/
+  def extractConjunctiveConditions(
+  rexProgram: RexProgram,
+  rexBuilder: RexBuilder,
+  catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = {
+
+rexProgram.getCondition match {
+  case condition: RexLocalRef =>
+val expanded = rexProgram.expandLocalRef(condition)
+// converts the expanded expression to conjunctive normal form,
+// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR 
c)"
+val cnf = RexUtil.toCnf(rexBuilder, expanded)
+// converts the cnf condition to a list of AND conditions
+val conjunctions = RelOptUtil.conjunctions(cnf)
+
+val convertedExpressions = new mutable.ArrayBuffer[Expression]
+val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode]
+val inputNames = 
rexProgram.getInputRowType.getFieldNames.asScala.toArray
+val converter = new ConvertToExpression(inputNames, catalog)
+
+conjunctions.asScala.foreach(rex => {
+  rex.accept(converter) match {
+case Some(expression) => convertedExpressions += expression
+case None => unconvertedRexNodes += rex
+  }
+})
+(convertedExpressions.toArray, unconvertedRexNodes.toArray)
+
+  case _ => (Array.empty, Array.empty)
+}
+  }
+}
+
+/**
+  * An RexVisitor to extract all referenced input fields
+  */
+class InputRefVisitor extends RexVisitorImpl[Unit](true) {
+
+  private var fields = mutable.LinkedHashSet[Int]()
+
+  def getFields: Array[Int] = fields.toArray
+
+  override def visitInputRef(inputRef: RexInputRef): 

[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106335709
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Expression, Literal, 
ResolvedFieldReference}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+object RexProgramExtractor {
+
+  /**
+* Extracts the indices of input fields which accessed by the 
RexProgram.
+*
+* @param rexProgram The RexProgram to analyze
+* @return The indices of accessed input fields
+*/
+  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+val visitor = new InputRefVisitor
+
+// extract referenced input fields from projections
+rexProgram.getProjectList.foreach(
+  exp => rexProgram.expandLocalRef(exp).accept(visitor))
+
+// extract referenced input fields from condition
+val condition = rexProgram.getCondition
+if (condition != null) {
+  rexProgram.expandLocalRef(condition).accept(visitor)
+}
+
+visitor.getFields
+  }
+
+  /**
+* Extract condition from RexProgram and convert it into independent 
CNF expressions.
+*
+* @param rexProgram The RexProgram to analyze
+* @return converted expressions as well as RexNodes which cannot be 
translated
+*/
+  def extractConjunctiveConditions(
+  rexProgram: RexProgram,
+  rexBuilder: RexBuilder,
+  catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = {
+
+rexProgram.getCondition match {
+  case condition: RexLocalRef =>
+val expanded = rexProgram.expandLocalRef(condition)
+// converts the expanded expression to conjunctive normal form,
+// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR 
c)"
+val cnf = RexUtil.toCnf(rexBuilder, expanded)
+// converts the cnf condition to a list of AND conditions
+val conjunctions = RelOptUtil.conjunctions(cnf)
+
+val convertedExpressions = new mutable.ArrayBuffer[Expression]
+val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode]
+val inputNames = 
rexProgram.getInputRowType.getFieldNames.asScala.toArray
+val converter = new ConvertToExpression(inputNames, catalog)
+
+conjunctions.asScala.foreach(rex => {
+  rex.accept(converter) match {
+case Some(expression) => convertedExpressions += expression
+case None => unconvertedRexNodes += rex
+  }
+})
+(convertedExpressions.toArray, unconvertedRexNodes.toArray)
+
+  case _ => (Array.empty, Array.empty)
+}
+  }
+}
+
+/**
+  * An RexVisitor to extract all referenced input fields
+  */
+class InputRefVisitor extends RexVisitorImpl[Unit](true) {
+
+  private var fields = mutable.LinkedHashSet[Int]()
+
+  def getFields: Array[Int] = fields.toArray
+
+  override def visitInputRef(inputRef: RexInputRef): Unit =
+fields += inputRef.getIndex
+
+  override def visitCall(call: RexCall): Unit =
+call.operands.foreach(operand => operand.accept(this))
+}
+
+/**
+  * An RexVisitor to convert RexNode to 

[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106335660
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Expression, Literal, 
ResolvedFieldReference}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+object RexProgramExtractor {
+
+  /**
+* Extracts the indices of input fields which accessed by the 
RexProgram.
+*
+* @param rexProgram The RexProgram to analyze
+* @return The indices of accessed input fields
+*/
+  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+val visitor = new InputRefVisitor
+
+// extract referenced input fields from projections
+rexProgram.getProjectList.foreach(
+  exp => rexProgram.expandLocalRef(exp).accept(visitor))
+
+// extract referenced input fields from condition
+val condition = rexProgram.getCondition
+if (condition != null) {
+  rexProgram.expandLocalRef(condition).accept(visitor)
+}
+
+visitor.getFields
+  }
+
+  /**
+* Extract condition from RexProgram and convert it into independent 
CNF expressions.
+*
+* @param rexProgram The RexProgram to analyze
+* @return converted expressions as well as RexNodes which cannot be 
translated
+*/
+  def extractConjunctiveConditions(
+  rexProgram: RexProgram,
+  rexBuilder: RexBuilder,
+  catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = {
+
+rexProgram.getCondition match {
+  case condition: RexLocalRef =>
+val expanded = rexProgram.expandLocalRef(condition)
+// converts the expanded expression to conjunctive normal form,
+// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR 
c)"
+val cnf = RexUtil.toCnf(rexBuilder, expanded)
+// converts the cnf condition to a list of AND conditions
+val conjunctions = RelOptUtil.conjunctions(cnf)
+
+val convertedExpressions = new mutable.ArrayBuffer[Expression]
+val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode]
+val inputNames = 
rexProgram.getInputRowType.getFieldNames.asScala.toArray
+val converter = new ConvertToExpression(inputNames, catalog)
+
+conjunctions.asScala.foreach(rex => {
+  rex.accept(converter) match {
+case Some(expression) => convertedExpressions += expression
+case None => unconvertedRexNodes += rex
+  }
+})
+(convertedExpressions.toArray, unconvertedRexNodes.toArray)
+
+  case _ => (Array.empty, Array.empty)
+}
+  }
+}
+
+/**
+  * An RexVisitor to extract all referenced input fields
+  */
+class InputRefVisitor extends RexVisitorImpl[Unit](true) {
+
+  private var fields = mutable.LinkedHashSet[Int]()
+
+  def getFields: Array[Int] = fields.toArray
+
+  override def visitInputRef(inputRef: RexInputRef): Unit =
+fields += inputRef.getIndex
+
+  override def visitCall(call: RexCall): Unit =
+call.operands.foreach(operand => operand.accept(this))
+}
+
+/**
+  * An RexVisitor to convert RexNode to 

[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927451#comment-15927451
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user WangTaoTheTonic commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r106335560
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CookieHandler.java
 ---
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class CookieHandler {
+
+   public static class ClientCookieHandler extends 
ChannelInboundHandlerAdapter {
+
+   private final Logger LOG = 
LoggerFactory.getLogger(ClientCookieHandler.class);
+
+   private final String secureCookie;
+
+   final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
+
+   public ClientCookieHandler(String secureCookie) {
+   this.secureCookie = secureCookie;
+   }
+
+   @Override
+   public void channelActive(ChannelHandlerContext ctx) throws 
Exception {
+   super.channelActive(ctx);
+   LOG.debug("In channelActive method of 
ClientCookieHandler");
+
+   if(this.secureCookie != null && 
this.secureCookie.length() != 0) {
+   LOG.debug("In channelActive method of 
ClientCookieHandler -> sending secure cookie");
+   final ByteBuf buffer = Unpooled.buffer(4 + 
this.secureCookie.getBytes(DEFAULT_CHARSET).length);
+   
buffer.writeInt(secureCookie.getBytes(DEFAULT_CHARSET).length);
+   
buffer.writeBytes(secureCookie.getBytes(DEFAULT_CHARSET));
+   ctx.writeAndFlush(buffer);
+   }
+   }
+   }
+
+   public static class ServerCookieDecoder extends 
MessageToMessageDecoder {
+
+   private final String secureCookie;
+
+   private final List channelList = new ArrayList<>();
+
+   private final Charset DEFAULT_CHARSET = 
Charset.forName("utf-8");
+
+   private final Logger LOG = 
LoggerFactory.getLogger(ServerCookieDecoder.class);
+
+   public ServerCookieDecoder(String secureCookie) {
+   this.secureCookie = secureCookie;
+   }
+
+   /**
+* Decode from one message to an other. This method will be 
called for each written message that can be handled
+* by this encoder.
+*
+* @param ctx the {@link ChannelHandlerContext} which this 
{@link MessageToMessageDecoder} belongs to
+* @param msg the message to decode to an other one
+* @param out the {@link List} to which decoded messages should 
be added
+* @throws Exception is thrown if an error accour
+*/
+   @Override
+   protected void decode(ChannelHandlerContext ctx, ByteBuf msg, 
List out) throws Exception {
+
+   LOG.debug("ChannelHandlerContext name: {}, channel: 
{}", ctx.name(), ctx.channel());
+
+   if(secureCookie == null || secureCookie.length() == 0) {
+   LOG.debug("Not validating secure cookie since 
the server configuration is not enabled to use cookie");
+  

[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2017-03-15 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r106335560
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CookieHandler.java
 ---
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class CookieHandler {
+
+   public static class ClientCookieHandler extends 
ChannelInboundHandlerAdapter {
+
+   private final Logger LOG = 
LoggerFactory.getLogger(ClientCookieHandler.class);
+
+   private final String secureCookie;
+
+   final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
+
+   public ClientCookieHandler(String secureCookie) {
+   this.secureCookie = secureCookie;
+   }
+
+   @Override
+   public void channelActive(ChannelHandlerContext ctx) throws 
Exception {
+   super.channelActive(ctx);
+   LOG.debug("In channelActive method of 
ClientCookieHandler");
+
+   if(this.secureCookie != null && 
this.secureCookie.length() != 0) {
+   LOG.debug("In channelActive method of 
ClientCookieHandler -> sending secure cookie");
+   final ByteBuf buffer = Unpooled.buffer(4 + 
this.secureCookie.getBytes(DEFAULT_CHARSET).length);
+   
buffer.writeInt(secureCookie.getBytes(DEFAULT_CHARSET).length);
+   
buffer.writeBytes(secureCookie.getBytes(DEFAULT_CHARSET));
+   ctx.writeAndFlush(buffer);
+   }
+   }
+   }
+
+   public static class ServerCookieDecoder extends 
MessageToMessageDecoder {
+
+   private final String secureCookie;
+
+   private final List channelList = new ArrayList<>();
+
+   private final Charset DEFAULT_CHARSET = 
Charset.forName("utf-8");
+
+   private final Logger LOG = 
LoggerFactory.getLogger(ServerCookieDecoder.class);
+
+   public ServerCookieDecoder(String secureCookie) {
+   this.secureCookie = secureCookie;
+   }
+
+   /**
+* Decode from one message to an other. This method will be 
called for each written message that can be handled
+* by this encoder.
+*
+* @param ctx the {@link ChannelHandlerContext} which this 
{@link MessageToMessageDecoder} belongs to
+* @param msg the message to decode to an other one
+* @param out the {@link List} to which decoded messages should 
be added
+* @throws Exception is thrown if an error accour
+*/
+   @Override
+   protected void decode(ChannelHandlerContext ctx, ByteBuf msg, 
List out) throws Exception {
+
+   LOG.debug("ChannelHandlerContext name: {}, channel: 
{}", ctx.name(), ctx.channel());
+
+   if(secureCookie == null || secureCookie.length() == 0) {
+   LOG.debug("Not validating secure cookie since 
the server configuration is not enabled to use cookie");
+   return;
+   }
+
+   LOG.debug("Going to decode the secure cookie passed by 
the remote client");
+
+   if(channelList.contains(ctx.channel())) {

[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2017-03-15 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r106335331
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CookieHandler.java
 ---
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class CookieHandler {
+
+   public static class ClientCookieHandler extends 
ChannelInboundHandlerAdapter {
+
+   private final Logger LOG = 
LoggerFactory.getLogger(ClientCookieHandler.class);
+
+   private final String secureCookie;
+
+   final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
+
+   public ClientCookieHandler(String secureCookie) {
+   this.secureCookie = secureCookie;
+   }
+
+   @Override
+   public void channelActive(ChannelHandlerContext ctx) throws 
Exception {
+   super.channelActive(ctx);
+   LOG.debug("In channelActive method of 
ClientCookieHandler");
+
+   if(this.secureCookie != null && 
this.secureCookie.length() != 0) {
+   LOG.debug("In channelActive method of 
ClientCookieHandler -> sending secure cookie");
+   final ByteBuf buffer = Unpooled.buffer(4 + 
this.secureCookie.getBytes(DEFAULT_CHARSET).length);
+   
buffer.writeInt(secureCookie.getBytes(DEFAULT_CHARSET).length);
+   
buffer.writeBytes(secureCookie.getBytes(DEFAULT_CHARSET));
+   ctx.writeAndFlush(buffer);
+   }
+   }
+   }
+
+   public static class ServerCookieDecoder extends 
MessageToMessageDecoder {
+
+   private final String secureCookie;
+
+   private final List channelList = new ArrayList<>();
--- End diff --

Is it better to use `Set` instead of a `List` here? As it is mainly used 
for lookup.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927446#comment-15927446
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user WangTaoTheTonic commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r106335331
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CookieHandler.java
 ---
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class CookieHandler {
+
+   public static class ClientCookieHandler extends 
ChannelInboundHandlerAdapter {
+
+   private final Logger LOG = 
LoggerFactory.getLogger(ClientCookieHandler.class);
+
+   private final String secureCookie;
+
+   final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
+
+   public ClientCookieHandler(String secureCookie) {
+   this.secureCookie = secureCookie;
+   }
+
+   @Override
+   public void channelActive(ChannelHandlerContext ctx) throws 
Exception {
+   super.channelActive(ctx);
+   LOG.debug("In channelActive method of 
ClientCookieHandler");
+
+   if(this.secureCookie != null && 
this.secureCookie.length() != 0) {
+   LOG.debug("In channelActive method of 
ClientCookieHandler -> sending secure cookie");
+   final ByteBuf buffer = Unpooled.buffer(4 + 
this.secureCookie.getBytes(DEFAULT_CHARSET).length);
+   
buffer.writeInt(secureCookie.getBytes(DEFAULT_CHARSET).length);
+   
buffer.writeBytes(secureCookie.getBytes(DEFAULT_CHARSET));
+   ctx.writeAndFlush(buffer);
+   }
+   }
+   }
+
+   public static class ServerCookieDecoder extends 
MessageToMessageDecoder {
+
+   private final String secureCookie;
+
+   private final List channelList = new ArrayList<>();
--- End diff --

Is it better to use `Set` instead of a `List` here? As it is mainly used 
for lookup.


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA

[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927442#comment-15927442
 ] 

ASF GitHub Bot commented on FLINK-5653:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106335062
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
 ---
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.plan.logical.rel
 
-import java.util
--- End diff --

Your current package is `org.apache.flink.table.plan.logical.rel`, in this 
package there is a` org.apache.flink.table.plan.logical.rel.util`, this and 
`util .List` package conflicts. So you can try the following 2 way solution:
solution1:
```
import java.util.List
groupSets: List[ImmutableBitSet],
aggCalls: List[AggregateCall])
```
solution2:
```
import java.{util => JUtil}
groupSets: JUtil.List[ImmutableBitSet],
aggCalls: JUtil.List[AggregateCall])
```




> Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5653
> URL: https://issues.apache.org/jira/browse/FLINK-5653
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Stefano Bortoli
>
> The goal of this issue is to add support for OVER ROWS aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5656)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-15 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106335062
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
 ---
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.plan.logical.rel
 
-import java.util
--- End diff --

Your current package is `org.apache.flink.table.plan.logical.rel`, in this 
package there is a` org.apache.flink.table.plan.logical.rel.util`, this and 
`util .List` package conflicts. So you can try the following 2 way solution:
solution1:
```
import java.util.List
groupSets: List[ImmutableBitSet],
aggCalls: List[AggregateCall])
```
solution2:
```
import java.{util => JUtil}
groupSets: JUtil.List[ImmutableBitSet],
aggCalls: JUtil.List[AggregateCall])
```




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5701) FlinkKafkaProducer should check asyncException on checkpoints

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927441#comment-15927441
 ] 

ASF GitHub Bot commented on FLINK-5701:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3549
  
Thanks for the review!
Travis is green (local branch), with only test timeouts: 
https://travis-ci.org/tzulitai/flink

Merging this ..


> FlinkKafkaProducer should check asyncException on checkpoints
> -
>
> Key: FLINK-5701
> URL: https://issues.apache.org/jira/browse/FLINK-5701
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0, 1.1.5, 1.2.1
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each 
> invoke() and decremented on each callback, used to check if the producer 
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff 
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}} 
> (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the 
> {{snapshotState}} method both before and after flushing and 
> {{pendingRecords}} becomes 0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3549: [backport-1.1] [FLINK-5701] [kafka] FlinkKafkaProducer sh...

2017-03-15 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3549
  
Thanks for the review!
Travis is green (local branch), with only test timeouts: 
https://travis-ci.org/tzulitai/flink

Merging this ..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-5048) Kafka Consumer (0.9/0.10) threading model leads problematic cancellation behavior

2017-03-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-5048.

   Resolution: Fixed
Fix Version/s: (was: 1.1.5)

> Kafka Consumer (0.9/0.10) threading model leads problematic cancellation 
> behavior
> -
>
> Key: FLINK-5048
> URL: https://issues.apache.org/jira/browse/FLINK-5048
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.3
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0
>
>
> The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that 
> operates the KafkaConsumer. That thread is shielded from interrupts, because 
> the Kafka Consumer has not been handling thread interrupts well.
> Since that thread is also the thread that emits records, it may block in the 
> network stack (backpressure) or in chained operators. The later case leads to 
> situations where cancellations get very slow unless that thread would be 
> interrupted (which it cannot be).
> I propose to change the thread model as follows:
>   - A spawned consumer thread pull from the KafkaConsumer and pushes its 
> pulled batch of records into a blocking queue (size one)
>   - The main thread of the task will pull the record batches from the 
> blocking queue and emit the records.
> This allows actually for some additional I/O overlay while limiting the 
> additional memory consumption - only two batches are ever held, one being 
> fetched and one being emitted.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5048) Kafka Consumer (0.9/0.10) threading model leads problematic cancellation behavior

2017-03-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927427#comment-15927427
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5048 at 3/16/17 3:43 AM:
-

Agreed, let's mark this issue as resolved and remove {{1.1.5}} from the fix 
versions then.


was (Author: tzulitai):
Agreed, let's resolve this issue and remove {{1.1.5}} from the fix versions 
then.

> Kafka Consumer (0.9/0.10) threading model leads problematic cancellation 
> behavior
> -
>
> Key: FLINK-5048
> URL: https://issues.apache.org/jira/browse/FLINK-5048
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.3
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.5
>
>
> The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that 
> operates the KafkaConsumer. That thread is shielded from interrupts, because 
> the Kafka Consumer has not been handling thread interrupts well.
> Since that thread is also the thread that emits records, it may block in the 
> network stack (backpressure) or in chained operators. The later case leads to 
> situations where cancellations get very slow unless that thread would be 
> interrupted (which it cannot be).
> I propose to change the thread model as follows:
>   - A spawned consumer thread pull from the KafkaConsumer and pushes its 
> pulled batch of records into a blocking queue (size one)
>   - The main thread of the task will pull the record batches from the 
> blocking queue and emit the records.
> This allows actually for some additional I/O overlay while limiting the 
> additional memory consumption - only two batches are ever held, one being 
> fetched and one being emitted.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5048) Kafka Consumer (0.9/0.10) threading model leads problematic cancellation behavior

2017-03-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927427#comment-15927427
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5048:


Agreed, let's resolve this issue and remove {{1.1.5}} from the fix versions 
then.

> Kafka Consumer (0.9/0.10) threading model leads problematic cancellation 
> behavior
> -
>
> Key: FLINK-5048
> URL: https://issues.apache.org/jira/browse/FLINK-5048
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.3
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.5
>
>
> The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that 
> operates the KafkaConsumer. That thread is shielded from interrupts, because 
> the Kafka Consumer has not been handling thread interrupts well.
> Since that thread is also the thread that emits records, it may block in the 
> network stack (backpressure) or in chained operators. The later case leads to 
> situations where cancellations get very slow unless that thread would be 
> interrupted (which it cannot be).
> I propose to change the thread model as follows:
>   - A spawned consumer thread pull from the KafkaConsumer and pushes its 
> pulled batch of records into a blocking queue (size one)
>   - The main thread of the task will pull the record batches from the 
> blocking queue and emit the records.
> This allows actually for some additional I/O overlay while limiting the 
> additional memory consumption - only two batches are ever held, one being 
> fetched and one being emitted.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5978) JM WebFrontend address ConfigOption is defined in ConfigConstants

2017-03-15 Thread fang yong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927411#comment-15927411
 ] 

fang yong commented on FLINK-5978:
--

Hi Chesnay Schepler, glad to see an easy issue while I'm new, could you please 
assign it to me?

> JM WebFrontend address ConfigOption is defined in ConfigConstants
> -
>
> Key: FLINK-5978
> URL: https://issues.apache.org/jira/browse/FLINK-5978
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.3.0
>
>
> The DEFAULT_JOB_MANAGER_WEB_FRONTEND_ADDRESS ConfigOption is defined in 
> ConfigConstants instead of JobManagerOptions.
> Additionally, the name should not contain DEFAULT_ since it doesn't even 
> define a default value...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106329219
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, 
DataSetCalc}
+import 
org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.FilterableTableSource
+
+class PushFilterIntoBatchTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataSetCalc],
+operand(classOf[BatchTableSourceScan], none)),
+  "PushFilterIntoBatchTableSourceScanRule")
+  with PushFilterIntoTableSourceScanRuleBase {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
+val scan: BatchTableSourceScan = 
call.rel(1).asInstanceOf[BatchTableSourceScan]
+scan.tableSource match {
+  case _: FilterableTableSource[_] =>
+calc.getProgram.getCondition != null
--- End diff --

Will add


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106329225
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, 
StreamTableSourceScan}
+import 
org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.FilterableTableSource
+
+class PushFilterIntoStreamTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataStreamCalc],
+operand(classOf[StreamTableSourceScan], none)),
+  "PushFilterIntoStreamTableSourceScanRule")
+  with PushFilterIntoTableSourceScanRuleBase {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
+val scan: StreamTableSourceScan = 
call.rel(1).asInstanceOf[StreamTableSourceScan]
+scan.tableSource match {
+  case _: FilterableTableSource[_] =>
+calc.getProgram.getCondition != null
--- End diff --

Will add


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927379#comment-15927379
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106329219
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, 
DataSetCalc}
+import 
org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.FilterableTableSource
+
+class PushFilterIntoBatchTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataSetCalc],
+operand(classOf[BatchTableSourceScan], none)),
+  "PushFilterIntoBatchTableSourceScanRule")
+  with PushFilterIntoTableSourceScanRuleBase {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
+val scan: BatchTableSourceScan = 
call.rel(1).asInstanceOf[BatchTableSourceScan]
+scan.tableSource match {
+  case _: FilterableTableSource[_] =>
+calc.getProgram.getCondition != null
--- End diff --

Will add


> Add FilterableTableSource interface and translation rule
> 
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Kurt Young
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106328751
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 ---
@@ -25,6 +25,7 @@ import org.apache.flink.table.sources.TableSource
 /** Table which defines an external table via a [[TableSource]] */
 class TableSourceTable[T](
 val tableSource: TableSource[T],
+val tableEnv: TableEnvironment,
--- End diff --

Yes, you are right, especially that UDF is currently registered as objects 
but not classes, it's really impossible to let TableSource supporting this. I 
will remove this filed and only use built-in functions when extracting 
expression form RexProgram.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927380#comment-15927380
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106329225
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, 
StreamTableSourceScan}
+import 
org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.FilterableTableSource
+
+class PushFilterIntoStreamTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataStreamCalc],
+operand(classOf[StreamTableSourceScan], none)),
+  "PushFilterIntoStreamTableSourceScanRule")
+  with PushFilterIntoTableSourceScanRuleBase {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc]
+val scan: StreamTableSourceScan = 
call.rel(1).asInstanceOf[StreamTableSourceScan]
+scan.tableSource match {
+  case _: FilterableTableSource[_] =>
+calc.getProgram.getCondition != null
--- End diff --

Will add


> Add FilterableTableSource interface and translation rule
> 
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Kurt Young
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106329099
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Expression, Literal, 
ResolvedFieldReference}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+object RexProgramExtractor {
+
+  /**
+* Extracts the indices of input fields which accessed by the 
RexProgram.
+*
+* @param rexProgram The RexProgram to analyze
+* @return The indices of accessed input fields
+*/
+  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+val visitor = new InputRefVisitor
+
+// extract referenced input fields from projections
+rexProgram.getProjectList.foreach(
+  exp => rexProgram.expandLocalRef(exp).accept(visitor))
+
+// extract referenced input fields from condition
+val condition = rexProgram.getCondition
+if (condition != null) {
+  rexProgram.expandLocalRef(condition).accept(visitor)
+}
+
+visitor.getFields
+  }
+
+  /**
+* Extract condition from RexProgram and convert it into independent 
CNF expressions.
+*
+* @param rexProgram The RexProgram to analyze
+* @return converted expressions as well as RexNodes which cannot be 
translated
+*/
+  def extractConjunctiveConditions(
+  rexProgram: RexProgram,
+  rexBuilder: RexBuilder,
+  catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = {
+
+rexProgram.getCondition match {
+  case condition: RexLocalRef =>
+val expanded = rexProgram.expandLocalRef(condition)
+// converts the expanded expression to conjunctive normal form,
+// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR 
c)"
+val cnf = RexUtil.toCnf(rexBuilder, expanded)
+// converts the cnf condition to a list of AND conditions
+val conjunctions = RelOptUtil.conjunctions(cnf)
+
+val convertedExpressions = new mutable.ArrayBuffer[Expression]
+val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode]
+val inputNames = 
rexProgram.getInputRowType.getFieldNames.asScala.toArray
+val converter = new ConvertToExpression(inputNames, catalog)
+
+conjunctions.asScala.foreach(rex => {
+  rex.accept(converter) match {
+case Some(expression) => convertedExpressions += expression
+case None => unconvertedRexNodes += rex
+  }
+})
+(convertedExpressions.toArray, unconvertedRexNodes.toArray)
+
+  case _ => (Array.empty, Array.empty)
+}
+  }
+}
+
+/**
+  * An RexVisitor to extract all referenced input fields
+  */
+class InputRefVisitor extends RexVisitorImpl[Unit](true) {
+
+  private var fields = mutable.LinkedHashSet[Int]()
+
+  def getFields: Array[Int] = fields.toArray
+
+  override def visitInputRef(inputRef: RexInputRef): Unit =
+fields += inputRef.getIndex
+
+  override def visitCall(call: RexCall): Unit =
+call.operands.foreach(operand => operand.accept(this))
+}
+
+/**
+  * An RexVisitor to convert RexNode to 

[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927377#comment-15927377
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106329079
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.RelOptRuleCall
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.table.plan.nodes.TableSourceScan
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.util.RexProgramExtractor
+import org.apache.flink.table.sources.FilterableTableSource
+
+trait PushFilterIntoTableSourceScanRuleBase {
+
+  private[flink] def pushFilterIntoScan(
+  call: RelOptRuleCall,
+  calc: Calc,
+  scan: TableSourceScan,
+  tableSourceTable: TableSourceTable[_],
+  filterableSource: FilterableTableSource[_],
+  description: String): Unit = {
+
+if (filterableSource.isFilterPushedDown) {
--- End diff --

Will change this.


> Add FilterableTableSource interface and translation rule
> 
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Kurt Young
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927378#comment-15927378
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106329099
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Expression, Literal, 
ResolvedFieldReference}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+object RexProgramExtractor {
+
+  /**
+* Extracts the indices of input fields which accessed by the 
RexProgram.
+*
+* @param rexProgram The RexProgram to analyze
+* @return The indices of accessed input fields
+*/
+  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+val visitor = new InputRefVisitor
+
+// extract referenced input fields from projections
+rexProgram.getProjectList.foreach(
+  exp => rexProgram.expandLocalRef(exp).accept(visitor))
+
+// extract referenced input fields from condition
+val condition = rexProgram.getCondition
+if (condition != null) {
+  rexProgram.expandLocalRef(condition).accept(visitor)
+}
+
+visitor.getFields
+  }
+
+  /**
+* Extract condition from RexProgram and convert it into independent 
CNF expressions.
+*
+* @param rexProgram The RexProgram to analyze
+* @return converted expressions as well as RexNodes which cannot be 
translated
+*/
+  def extractConjunctiveConditions(
+  rexProgram: RexProgram,
+  rexBuilder: RexBuilder,
+  catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = {
+
+rexProgram.getCondition match {
+  case condition: RexLocalRef =>
+val expanded = rexProgram.expandLocalRef(condition)
+// converts the expanded expression to conjunctive normal form,
+// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR 
c)"
+val cnf = RexUtil.toCnf(rexBuilder, expanded)
+// converts the cnf condition to a list of AND conditions
+val conjunctions = RelOptUtil.conjunctions(cnf)
+
+val convertedExpressions = new mutable.ArrayBuffer[Expression]
+val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode]
+val inputNames = 
rexProgram.getInputRowType.getFieldNames.asScala.toArray
+val converter = new ConvertToExpression(inputNames, catalog)
+
+conjunctions.asScala.foreach(rex => {
+  rex.accept(converter) match {
+case Some(expression) => convertedExpressions += expression
+case None => unconvertedRexNodes += rex
+  }
+})
+(convertedExpressions.toArray, unconvertedRexNodes.toArray)
+
+  case _ => (Array.empty, Array.empty)
+}
+  }
+}
+
+/**
+  * An RexVisitor to extract all referenced input fields
+  */
+class InputRefVisitor extends RexVisitorImpl[Unit](true) {
+
+  private var fields = mutable.LinkedHashSet[Int]()
+
+  def getFields: Array[Int] = fields.toArray
+
+  override def visitInputRef(inputRef: RexInputRef): 

[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106329079
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.RelOptRuleCall
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.table.plan.nodes.TableSourceScan
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.util.RexProgramExtractor
+import org.apache.flink.table.sources.FilterableTableSource
+
+trait PushFilterIntoTableSourceScanRuleBase {
+
+  private[flink] def pushFilterIntoScan(
+  call: RelOptRuleCall,
+  calc: Calc,
+  scan: TableSourceScan,
+  tableSourceTable: TableSourceTable[_],
+  filterableSource: FilterableTableSource[_],
+  description: String): Unit = {
+
+if (filterableSource.isFilterPushedDown) {
--- End diff --

Will change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5650) Flink-python tests executing cost too long time

2017-03-15 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-5650:
-
Labels: osx  (was: )

> Flink-python tests executing cost too long time
> ---
>
> Key: FLINK-5650
> URL: https://issues.apache.org/jira/browse/FLINK-5650
> Project: Flink
>  Issue Type: Bug
>  Components: Python API, Tests
>Affects Versions: 1.2.0
>Reporter: shijinkui
>Priority: Critical
>  Labels: osx
> Fix For: 1.2.1
>
>
> When execute `mvn clean test` in flink-python, it will wait more than half 
> hour after the console output below:
> ---
>  T E S T S
> ---
> Running org.apache.flink.python.api.PythonPlanBinderTest
> log4j:WARN No appenders could be found for logger 
> (org.apache.flink.python.api.PythonPlanBinderTest).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
> info.
> The stack below:
> "main" prio=5 tid=0x7f8d7780b800 nid=0x1c03 waiting on condition 
> [0x79fd8000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
>   at java.lang.Thread.sleep(Native Method)
>   at 
> org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.startPython(PythonPlanStreamer.java:70)
>   at 
> org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.open(PythonPlanStreamer.java:50)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.startPython(PythonPlanBinder.java:211)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:141)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:114)
>   at 
> org.apache.flink.python.api.PythonPlanBinderTest.testProgram(PythonPlanBinderTest.java:83)
>   at 
> org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:174)
> this is the jstack:
> https://gist.github.com/shijinkui/af47e8bc6c9f748336bf52efd3df94b0



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927371#comment-15927371
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106328751
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 ---
@@ -25,6 +25,7 @@ import org.apache.flink.table.sources.TableSource
 /** Table which defines an external table via a [[TableSource]] */
 class TableSourceTable[T](
 val tableSource: TableSource[T],
+val tableEnv: TableEnvironment,
--- End diff --

Yes, you are right, especially that UDF is currently registered as objects 
but not classes, it's really impossible to let TableSource supporting this. I 
will remove this filed and only use built-in functions when extracting 
expression form RexProgram.


> Add FilterableTableSource interface and translation rule
> 
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Kurt Young
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5981) SSL version and ciper suites cannot be constrained as configured

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927368#comment-15927368
 ] 

ASF GitHub Bot commented on FLINK-5981:
---

Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3486
  
Since not merged, I've turned them around. Sorry for the carelessness :(


> SSL version and ciper suites cannot be constrained as configured
> 
>
> Key: FLINK-5981
> URL: https://issues.apache.org/jira/browse/FLINK-5981
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Reporter: Tao Wang
>Assignee: Tao Wang
>
> I configured ssl and start flink job, but found configured properties cannot 
> apply properly:
> akka port: only ciper suites apply right, ssl version not
> blob server/netty server: both ssl version and ciper suites are not like what 
> I configured
> I've found out the reason why:
> http://stackoverflow.com/questions/11504173/sslcontext-initialization (for 
> blob server and netty server)
> https://groups.google.com/forum/#!topic/akka-user/JH6bGnWE8kY(for akka ssl 
> version, it's fixed in akka 2.4:https://github.com/akka/akka/pull/21078)
> I'll fix the issue on blob server and netty server, and it seems like only 
> upgrade for akka can solve issue in akka side(we'll consider later as upgrade 
> is not a small action).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3486: [FLINK-5981][SECURITY]make ssl version and cipher suites ...

2017-03-15 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3486
  
Since not merged, I've turned them around. Sorry for the carelessness :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5498) Add support for left/right outer joins with non-equality predicates (and 1+ equality predicates)

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927355#comment-15927355
 ] 

ASF GitHub Bot commented on FLINK-5498:
---

Github user lincoln-lil commented on the issue:

https://github.com/apache/flink/pull/3379
  
Part of integration tests will occasionally fail when submit this pr,  and 
I configured Travis CI for my repository then got a whole pass.  see 
https://travis-ci.org/lincoln-lil/flink/builds/208811166

I've rebased master and updated this pr , and finished a successful local 
verify.
```
[INFO] BUILD SUCCESS
[INFO] 

[INFO] Total time: 36:39 min
[INFO] Finished at: 2017-03-16T10:12:34+08:00
[INFO] Final Memory: 231M/1187M
```
 
It'll be appreciated if someone can review this pr.



> Add support for left/right outer joins with non-equality predicates (and 1+ 
> equality predicates)
> 
>
> Key: FLINK-5498
> URL: https://issues.apache.org/jira/browse/FLINK-5498
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Minor
>
> I found the expected result of a unit test case incorrect compare to that in 
> a RDMBS, 
> see 
> flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
> {code:title=JoinITCase.scala}
> def testRightJoinWithNotOnlyEquiJoin(): Unit = {
>  ...
>  val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 
> 'c)
>  val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 
> 'f, 'g, 'h)
>  val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
>  
>  val expected = "Hello world,BCD\n"
>  val results = joinT.toDataSet[Row].collect()
>  TestBaseUtils.compareResultAsText(results.asJava, expected)
> }
> {code}
> Then I took some time to learn about the ‘outer join’ in relational 
> databases, the right result of above case should be(tested in SQL Server and 
> MySQL, the results are same):
> {code}
> > select c, g from tuple3 right outer join tuple5 on a=f and b cg   
>  
> NULL Hallo   
> NULL Hallo Welt  
> NULL Hallo Welt wie  
> NULL Hallo Welt wie gehts?   
> NULL ABC 
> Hello world  BCD 
> NULL CDE 
> NULL DEF 
> NULL EFG 
> NULL FGH 
> NULL GHI 
> NULL HIJ 
> NULL IJK 
> NULL JKL 
> NULL KLM   
> {code}
> the join condition {{rightOuterJoin('a === 'd && 'b < 'h)}} is not equivalent 
> to {{rightOuterJoin('a === 'd).where('b < 'h)}}.  
> The problem is rooted in the code-generated {{JoinFunction}} (see 
> {{DataSetJoin.translateToPlan()}}, line 188). If the join condition does not 
> match, we must emit the outer row padded with nulls instead of returning from 
> the function without emitting anything.
> The code-generated {{JoinFunction}} does also include equality predicates. 
> These should be removed before generating the code, e.g., in 
> {{DataSetJoinRule}} when generating the {{DataSetJoin}} with help of 
> {{JoinInfo.getRemaining()}}.
> More details: https://goo.gl/ngekca



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3379: [FLINK-5498] [table] Add support for left/right outer joi...

2017-03-15 Thread lincoln-lil
Github user lincoln-lil commented on the issue:

https://github.com/apache/flink/pull/3379
  
Part of integration tests will occasionally fail when submit this pr,  and 
I configured Travis CI for my repository then got a whole pass.  see 
https://travis-ci.org/lincoln-lil/flink/builds/208811166

I've rebased master and updated this pr , and finished a successful local 
verify.
```
[INFO] BUILD SUCCESS
[INFO] 

[INFO] Total time: 36:39 min
[INFO] Finished at: 2017-03-16T10:12:34+08:00
[INFO] Final Memory: 231M/1187M
```
 
It'll be appreciated if someone can review this pr.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927353#comment-15927353
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106327707
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.RelOptRuleCall
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.table.plan.nodes.TableSourceScan
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.util.RexProgramExtractor
+import org.apache.flink.table.sources.FilterableTableSource
+
+trait PushFilterIntoTableSourceScanRuleBase {
+
+  private[flink] def pushFilterIntoScan(
+  call: RelOptRuleCall,
+  calc: Calc,
+  scan: TableSourceScan,
+  tableSourceTable: TableSourceTable[_],
+  filterableSource: FilterableTableSource[_],
+  description: String): Unit = {
+
+if (filterableSource.isFilterPushedDown) {
+  // The rule can get triggered again due to the transformed "scan => 
filter"
+  // sequence created by the earlier execution of this rule when we 
could not
+  // push all the conditions into the scan
+  return
+}
+
+val program = calc.getProgram
+val (predicates, unconvertedRexNodes) =
+  RexProgramExtractor.extractConjunctiveConditions(
+program,
+call.builder().getRexBuilder,
+tableSourceTable.tableEnv.getFunctionCatalog)
+if (predicates.isEmpty) {
+  // no condition can be translated to expression
+  return
+}
+
+val (newTableSource, remainingPredicates) = 
filterableSource.applyPredicate(predicates)
+// trying to apply filter push down, set the flag to true no matter 
whether
+// we actually push any filters down.
+newTableSource.setFilterPushedDown(true)
+
+// check whether framework still need to do a filter
+val relBuilder = call.builder()
+val remainingCondition = {
+  if (remainingPredicates.nonEmpty || unconvertedRexNodes.nonEmpty) {
+relBuilder.push(scan)
+(remainingPredicates.map(expr => expr.toRexNode(relBuilder)) ++ 
unconvertedRexNodes)
+.reduce((l, r) => relBuilder.and(l, r))
+  } else {
+null
+  }
+}
+
+// check whether we still need a RexProgram. An RexProgram is needed 
when either
+// projection or filter exists.
+val newScan = scan.copy(scan.getTraitSet, newTableSource)
+val newRexProgram = {
+  if (remainingCondition != null || program.getProjectList.size() > 0) 
{
--- End diff --

Thanks for the tips, will change this.


> Add FilterableTableSource interface and translation rule
> 
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Kurt Young
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the 

[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106327707
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.RelOptRuleCall
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.table.plan.nodes.TableSourceScan
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.util.RexProgramExtractor
+import org.apache.flink.table.sources.FilterableTableSource
+
+trait PushFilterIntoTableSourceScanRuleBase {
+
+  private[flink] def pushFilterIntoScan(
+  call: RelOptRuleCall,
+  calc: Calc,
+  scan: TableSourceScan,
+  tableSourceTable: TableSourceTable[_],
+  filterableSource: FilterableTableSource[_],
+  description: String): Unit = {
+
+if (filterableSource.isFilterPushedDown) {
+  // The rule can get triggered again due to the transformed "scan => 
filter"
+  // sequence created by the earlier execution of this rule when we 
could not
+  // push all the conditions into the scan
+  return
+}
+
+val program = calc.getProgram
+val (predicates, unconvertedRexNodes) =
+  RexProgramExtractor.extractConjunctiveConditions(
+program,
+call.builder().getRexBuilder,
+tableSourceTable.tableEnv.getFunctionCatalog)
+if (predicates.isEmpty) {
+  // no condition can be translated to expression
+  return
+}
+
+val (newTableSource, remainingPredicates) = 
filterableSource.applyPredicate(predicates)
+// trying to apply filter push down, set the flag to true no matter 
whether
+// we actually push any filters down.
+newTableSource.setFilterPushedDown(true)
+
+// check whether framework still need to do a filter
+val relBuilder = call.builder()
+val remainingCondition = {
+  if (remainingPredicates.nonEmpty || unconvertedRexNodes.nonEmpty) {
+relBuilder.push(scan)
+(remainingPredicates.map(expr => expr.toRexNode(relBuilder)) ++ 
unconvertedRexNodes)
+.reduce((l, r) => relBuilder.and(l, r))
+  } else {
+null
+  }
+}
+
+// check whether we still need a RexProgram. An RexProgram is needed 
when either
+// projection or filter exists.
+val newScan = scan.copy(scan.getTraitSet, newTableSource)
+val newRexProgram = {
+  if (remainingCondition != null || program.getProjectList.size() > 0) 
{
--- End diff --

Thanks for the tips, will change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5883) Re-adding the Exception-thrown code for ListKeyGroupedIterator when the iterator is requested the second time

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927325#comment-15927325
 ] 

ASF GitHub Bot commented on FLINK-5883:
---

Github user lincoln-lil commented on the issue:

https://github.com/apache/flink/pull/3392
  
Appreciated if someone can merge this pr


> Re-adding the Exception-thrown code for ListKeyGroupedIterator when the 
> iterator is requested the second time
> -
>
> Key: FLINK-5883
> URL: https://issues.apache.org/jira/browse/FLINK-5883
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>
> Originally, ListKeyGroupedIterator ensured that a TraversableOnceException 
> was thrown when the iterator is requested the second time within FLINK-1023, 
> it was lost from FLINK-1110 unexpectedly, so add it back. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3392: [FLINK-5883] Re-adding the Exception-thrown code for List...

2017-03-15 Thread lincoln-lil
Github user lincoln-lil commented on the issue:

https://github.com/apache/flink/pull/3392
  
Appreciated if someone can merge this pr


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927322#comment-15927322
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106324953
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.{ProcessFunction}
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.StringSerializer
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.core.memory.{DataInputViewStreamWrapper, 
DataOutputViewStreamWrapper}
+import org.apache.flink.runtime.state.{FunctionInitializationContext, 
FunctionSnapshotContext}
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param interMediateType the intermediate row tye which the state saved
+  * @param keySelector the keyselector
+  * @param keyType the key type
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val interMediateType: TypeInformation[Row],
+private val keySelector: KeySelector[Row, Tuple],
+private val keyType: TypeInformation[Tuple])
+  extends ProcessFunction[Row, Row]
+  with CheckpointedFunction{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var state: MapState[TimeWindow, Row] = _
+  private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = 
aggregates.zipWithIndex
+
+  /** Sorted list per key for choose the recent result and the records 
need retraction **/
+  private val timeSectionsMap: java.util.HashMap[Tuple, 
java.util.LinkedList[TimeWindow]] =
+new java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]]
+
+  /** For store timeSectionsMap **/
+  private var timeSectionsState: ListState[String] = _
+  private var inputKeySerializer: TypeSerializer[Tuple] = _
+  private var timeSerializer: TypeSerializer[TimeWindow] = _
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
interMediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+timeSerializer = new TimeWindow.Serializer
+val stateDescriptor: MapStateDescriptor[TimeWindow, Row] =
+  new MapStateDescriptor[TimeWindow, Row]("rowtimeoverstate", 

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-15 Thread hongyuhong
Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106324953
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.{ProcessFunction}
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.StringSerializer
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.core.memory.{DataInputViewStreamWrapper, 
DataOutputViewStreamWrapper}
+import org.apache.flink.runtime.state.{FunctionInitializationContext, 
FunctionSnapshotContext}
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param interMediateType the intermediate row tye which the state saved
+  * @param keySelector the keyselector
+  * @param keyType the key type
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val interMediateType: TypeInformation[Row],
+private val keySelector: KeySelector[Row, Tuple],
+private val keyType: TypeInformation[Tuple])
+  extends ProcessFunction[Row, Row]
+  with CheckpointedFunction{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var state: MapState[TimeWindow, Row] = _
+  private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = 
aggregates.zipWithIndex
+
+  /** Sorted list per key for choose the recent result and the records 
need retraction **/
+  private val timeSectionsMap: java.util.HashMap[Tuple, 
java.util.LinkedList[TimeWindow]] =
+new java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]]
+
+  /** For store timeSectionsMap **/
+  private var timeSectionsState: ListState[String] = _
+  private var inputKeySerializer: TypeSerializer[Tuple] = _
+  private var timeSerializer: TypeSerializer[TimeWindow] = _
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
interMediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+timeSerializer = new TimeWindow.Serializer
+val stateDescriptor: MapStateDescriptor[TimeWindow, Row] =
+  new MapStateDescriptor[TimeWindow, Row]("rowtimeoverstate", 
timeSerializer, valueSerializer)
+inputKeySerializer = 
keyType.createSerializer(getRuntimeContext.getExecutionConfig)
+state = getRuntimeContext.getMapState[TimeWindow, Row](stateDescriptor)
+  }
+
+  override def 

[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927319#comment-15927319
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106324883
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
 ---
@@ -58,16 +60,24 @@ class BatchTableSourceScan(
 )
   }
 
+  override def copy(traitSet: RelTraitSet, newTableSource: 
TableSource[_]): TableSourceScan = {
+new BatchTableSourceScan(
+  cluster,
+  traitSet,
+  getTable,
+  newTableSource.asInstanceOf[BatchTableSource[_]]
+)
+  }
+
   override def explainTerms(pw: RelWriter): RelWriter = {
-super.explainTerms(pw)
+val terms = super.explainTerms(pw)
   .item("fields", 
TableEnvironment.getFieldNames(tableSource).mkString(", "))
+tableSource.explainTerms(terms)
--- End diff --

will change this.


> Add FilterableTableSource interface and translation rule
> 
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Kurt Young
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927318#comment-15927318
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106324868
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -39,4 +39,6 @@ trait TableSource[T] {
   /** Returns the [[TypeInformation]] for the return type of the 
[[TableSource]]. */
   def getReturnType: TypeInformation[T]
 
+  /** Describes the table source */
+  def explainTerms(pw: RelWriter): RelWriter = pw
--- End diff --

Make sense to me, will change this.


> Add FilterableTableSource interface and translation rule
> 
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Kurt Young
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106324883
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
 ---
@@ -58,16 +60,24 @@ class BatchTableSourceScan(
 )
   }
 
+  override def copy(traitSet: RelTraitSet, newTableSource: 
TableSource[_]): TableSourceScan = {
+new BatchTableSourceScan(
+  cluster,
+  traitSet,
+  getTable,
+  newTableSource.asInstanceOf[BatchTableSource[_]]
+)
+  }
+
   override def explainTerms(pw: RelWriter): RelWriter = {
-super.explainTerms(pw)
+val terms = super.explainTerms(pw)
   .item("fields", 
TableEnvironment.getFieldNames(tableSource).mkString(", "))
+tableSource.explainTerms(terms)
--- End diff --

will change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106324868
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -39,4 +39,6 @@ trait TableSource[T] {
   /** Returns the [[TypeInformation]] for the return type of the 
[[TableSource]]. */
   def getReturnType: TypeInformation[T]
 
+  /** Describes the table source */
+  def explainTerms(pw: RelWriter): RelWriter = pw
--- End diff --

Make sense to me, will change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5668) passing taskmanager configuration through taskManagerEnv instead of file

2017-03-15 Thread Bill Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927308#comment-15927308
 ] 

Bill Liu commented on FLINK-5668:
-

it still doesn't solve my problem. I want to start Flink Job without HDFS(or 
any writable shared file system).

> passing taskmanager configuration through taskManagerEnv instead of file
> 
>
> Key: FLINK-5668
> URL: https://issues.apache.org/jira/browse/FLINK-5668
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: Bill Liu
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> When create a Flink cluster on Yarn,  JobManager depends on  HDFS to share  
> taskmanager-conf.yaml  with TaskManager.
> It's better to share the taskmanager-conf.yaml  on JobManager Web server 
> instead of HDFS, which could reduce the HDFS dependency  at job startup.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927296#comment-15927296
 ] 

ASF GitHub Bot commented on FLINK-4364:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/3151
  
@tillrohrmann , just remind to review my modifications for your free time, 
because this would block my next pull request of heartbeat between 
`TaskManager` and `ResourceManager`. Thank you! 


> Implement TaskManager side of heartbeat from JobManager
> ---
>
> Key: FLINK-4364
> URL: https://issues.apache.org/jira/browse/FLINK-4364
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: zhijiang
>Assignee: zhijiang
>
> The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and 
> the {{TaskManager}} will report metrics info for each heartbeat.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5865) Throw original exception in states

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927295#comment-15927295
 ] 

ASF GitHub Bot commented on FLINK-5865:
---

Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3380
  
I prefer to throw more detailed exceptions e.g. 
`IncompatibleTypeSerializerException`, `StateAccessException` and 
`StateNotFoundException`. They all are extended from `FlinkRuntimeException`.  
Users can get more information from these exceptions if they catch the 
exceptions.


> Throw original exception in states
> --
>
> Key: FLINK-5865
> URL: https://issues.apache.org/jira/browse/FLINK-5865
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Now all exception thrown in RocksDB states are converted to 
> {{RuntimeException}}. It's unnecessary and will print useless stacks in the 
> log.
> I think it's better to throw the original exception, without any wrapping.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3151: [FLINK-4364] [runtime] [FLIP-6] Implement TaskManager sid...

2017-03-15 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/3151
  
@tillrohrmann , just remind to review my modifications for your free time, 
because this would block my next pull request of heartbeat between 
`TaskManager` and `ResourceManager`. Thank you! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3380: [FLINK-5865][state] Throw original exception in the state...

2017-03-15 Thread shixiaogang
Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/3380
  
I prefer to throw more detailed exceptions e.g. 
`IncompatibleTypeSerializerException`, `StateAccessException` and 
`StateNotFoundException`. They all are extended from `FlinkRuntimeException`.  
Users can get more information from these exceptions if they catch the 
exceptions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106322568
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
 ---
@@ -42,63 +41,40 @@ class DataSetCalc(
 traitSet: RelTraitSet,
 input: RelNode,
 rowRelDataType: RelDataType,
-private[flink] val calcProgram: RexProgram, // for tests
+calcProgram: RexProgram,
 ruleDescription: String)
-  extends SingleRel(cluster, traitSet, input)
+  extends Calc(cluster, traitSet, input, calcProgram)
--- End diff --

This is because i want to unify the PushFilterIntoScan rule's code for both 
batch and stream mode. During executing the rule, we may need to create a new 
copy of the DataSetCalc or DataStreamCalc. It make things more easier to let 
these two classes inherit from `Calc`, and use `Calc.copy` to create a new 
copied instance. 

I do encountered some problem after i changed the hierarchy, some unit 
tests failed because of the plan changed. But it's because we don't calculate 
the cost for Calc right. I added some logic to `CommanCalc.computeSelfCost`, 
and everything works fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927290#comment-15927290
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106322568
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
 ---
@@ -42,63 +41,40 @@ class DataSetCalc(
 traitSet: RelTraitSet,
 input: RelNode,
 rowRelDataType: RelDataType,
-private[flink] val calcProgram: RexProgram, // for tests
+calcProgram: RexProgram,
 ruleDescription: String)
-  extends SingleRel(cluster, traitSet, input)
+  extends Calc(cluster, traitSet, input, calcProgram)
--- End diff --

This is because i want to unify the PushFilterIntoScan rule's code for both 
batch and stream mode. During executing the rule, we may need to create a new 
copy of the DataSetCalc or DataStreamCalc. It make things more easier to let 
these two classes inherit from `Calc`, and use `Calc.copy` to create a new 
copied instance. 

I do encountered some problem after i changed the hierarchy, some unit 
tests failed because of the plan changed. But it's because we don't calculate 
the cost for Calc right. I added some logic to `CommanCalc.computeSelfCost`, 
and everything works fine.


> Add FilterableTableSource interface and translation rule
> 
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Kurt Young
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927277#comment-15927277
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106321885
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala
 ---
@@ -24,7 +24,7 @@ package org.apache.flink.table.sources
   *
   * @tparam T The return type of the [[ProjectableTableSource]].
   */
-trait ProjectableTableSource[T] {
+trait ProjectableTableSource[T] extends TableSource[T] {
--- End diff --

It's because i want to unify the PushProjectIntoScan rule codes for both 
batch and stream mode. And once we push down project into table source, we not 
only should create a new TableScan instance, but also a new TableSource 
instance. The codes are like:
```
val newTableSource = originTableSource.projectFields(usedFields)
// create a new scan with the new TableSource instance
val newScan = scan.copy(scan.getTraitSet, newTableSource) 
```
At first the `projectFields` method returned `ProjectableTableSource` which 
is not a `TableSource`, so i let `ProjectableTableSource` inherit from 
`TableSource`. But i just noticed we can just let `projectFields` return one 
`TableSource`, and problem resolved.

Will change this.


> Add FilterableTableSource interface and translation rule
> 
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Kurt Young
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-15 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106321885
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala
 ---
@@ -24,7 +24,7 @@ package org.apache.flink.table.sources
   *
   * @tparam T The return type of the [[ProjectableTableSource]].
   */
-trait ProjectableTableSource[T] {
+trait ProjectableTableSource[T] extends TableSource[T] {
--- End diff --

It's because i want to unify the PushProjectIntoScan rule codes for both 
batch and stream mode. And once we push down project into table source, we not 
only should create a new TableScan instance, but also a new TableSource 
instance. The codes are like:
```
val newTableSource = originTableSource.projectFields(usedFields)
// create a new scan with the new TableSource instance
val newScan = scan.copy(scan.getTraitSet, newTableSource) 
```
At first the `projectFields` method returned `ProjectableTableSource` which 
is not a `TableSource`, so i let `ProjectableTableSource` inherit from 
`TableSource`. But i just noticed we can just let `projectFields` return one 
`TableSource`, and problem resolved.

Will change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-6026) Return type of flatMap with lambda function not correctly resolved

2017-03-15 Thread Luke Hutchison (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Hutchison closed FLINK-6026.
-
Resolution: Not A Bug

> Return type of flatMap with lambda function not correctly resolved
> --
>
> Key: FLINK-6026
> URL: https://issues.apache.org/jira/browse/FLINK-6026
> Project: Flink
>  Issue Type: Bug
>  Components: Core, DataSet API, DataStream API
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>Priority: Minor
>
> I get an error if I try naming a flatMap operation:
> {code}
> DataSet> y = x.flatMap((t, out) -> 
> out.collect(t)).name("op");
> {code}
> Type mismatch: cannot convert from 
> FlatMapOperator,Object> to 
> DataSet>
> If I try to do it as two steps, I get the error that DataSet does not have a 
> .name(String) method:
> {code}
> DataSet> y = x.flatMap((t, out) -> out.collect(t));
> y.name("op");
> {code}
> If I use Eclipse type inference on x, it shows me that the output type is not 
> correctly inferred:
> {code}
> FlatMapOperator, Object> y = x.flatMap((t, out) -> 
> out.collect(t));
> y.name("op");   // This now works, but "Object" is not the output type
> {code}
> However, these steps still cannot be chained -- the following still gives an 
> error:
> {code}
> FlatMapOperator, Object> y = x.flatMap((t, out) -> 
> out.collect(t)).name("op");
> {code}
> i.e. first you have to assign the result to a field, so that the type is 
> fully specified; then you can name the operation.
> And the weird thing is that you can give the correct, more specific type for 
> the local variable, without a type narrowing error:
> {code}
> FlatMapOperator, Tuple2> y = 
> x.flatMap((t, out) -> out.collect(t));
> y.name("op");   // This works, although chaining these two lines still does 
> not work
> {code}
> If the types of the lambda args are specified, then everything works:
> {code}
> DataSet> y = x.flatMap((Tuple2 t, 
> Collector> out) -> out.collect(t)).name("op");
> {code}
> So, at least two things are going on here:
> (1) type inference is not working correctly for the lambda parameters
> (2) this breaks type inference for intermediate expressions, unless the type 
> can be resolved using a local variable definition
> Is this a bug in the type signature of flatMap? (Or a compiler bug or 
> limitation, or a fundamental limitation of Java 8 type inference?)
> It seems odd that the type of a local variable definition can make the result 
> of the flatMap operator *more* specific, taking the type from 
> {code}
> FlatMapOperator, Object>
> {code}
> to 
> {code}
> FlatMapOperator, Tuple2>
> {code}
> i.e. if the output type is provided in the local variable definition, it is 
> properly unified with the type of the parameter t of collect(t), however that 
> type is not propagated out of that call.
> Can anything be done about this in Flink? I have hit this problem a few times.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6026) Return type of flatMap with lambda function not correctly resolved

2017-03-15 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927065#comment-15927065
 ] 

Luke Hutchison commented on FLINK-6026:
---

Makes sense. I was wondering if there was some sort of type signature tweak 
that could be performed to make this work. I guess not -- thanks anyway!


> Return type of flatMap with lambda function not correctly resolved
> --
>
> Key: FLINK-6026
> URL: https://issues.apache.org/jira/browse/FLINK-6026
> Project: Flink
>  Issue Type: Bug
>  Components: Core, DataSet API, DataStream API
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>Priority: Minor
>
> I get an error if I try naming a flatMap operation:
> {code}
> DataSet> y = x.flatMap((t, out) -> 
> out.collect(t)).name("op");
> {code}
> Type mismatch: cannot convert from 
> FlatMapOperator,Object> to 
> DataSet>
> If I try to do it as two steps, I get the error that DataSet does not have a 
> .name(String) method:
> {code}
> DataSet> y = x.flatMap((t, out) -> out.collect(t));
> y.name("op");
> {code}
> If I use Eclipse type inference on x, it shows me that the output type is not 
> correctly inferred:
> {code}
> FlatMapOperator, Object> y = x.flatMap((t, out) -> 
> out.collect(t));
> y.name("op");   // This now works, but "Object" is not the output type
> {code}
> However, these steps still cannot be chained -- the following still gives an 
> error:
> {code}
> FlatMapOperator, Object> y = x.flatMap((t, out) -> 
> out.collect(t)).name("op");
> {code}
> i.e. first you have to assign the result to a field, so that the type is 
> fully specified; then you can name the operation.
> And the weird thing is that you can give the correct, more specific type for 
> the local variable, without a type narrowing error:
> {code}
> FlatMapOperator, Tuple2> y = 
> x.flatMap((t, out) -> out.collect(t));
> y.name("op");   // This works, although chaining these two lines still does 
> not work
> {code}
> If the types of the lambda args are specified, then everything works:
> {code}
> DataSet> y = x.flatMap((Tuple2 t, 
> Collector> out) -> out.collect(t)).name("op");
> {code}
> So, at least two things are going on here:
> (1) type inference is not working correctly for the lambda parameters
> (2) this breaks type inference for intermediate expressions, unless the type 
> can be resolved using a local variable definition
> Is this a bug in the type signature of flatMap? (Or a compiler bug or 
> limitation, or a fundamental limitation of Java 8 type inference?)
> It seems odd that the type of a local variable definition can make the result 
> of the flatMap operator *more* specific, taking the type from 
> {code}
> FlatMapOperator, Object>
> {code}
> to 
> {code}
> FlatMapOperator, Tuple2>
> {code}
> i.e. if the output type is provided in the local variable definition, it is 
> properly unified with the type of the parameter t of collect(t), however that 
> type is not propagated out of that call.
> Can anything be done about this in Flink? I have hit this problem a few times.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3414) Add Scala API for CEP's pattern definition

2017-03-15 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927063#comment-15927063
 ] 

Ivan Mushketyk commented on FLINK-3414:
---

Hi [~dawidwys]

I don't have time to work on this task now, so I don't mind if you take over.

I'll reassign it to you.

> Add Scala API for CEP's pattern definition
> --
>
> Key: FLINK-3414
> URL: https://issues.apache.org/jira/browse/FLINK-3414
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the CEP library only supports a Java API to specify complex event 
> patterns. In order to make it a bit less verbose for Scala users, it would be 
> nice to also add a Scala API for the CEP library. 
> A Scala API would also allow to pass Scala's anonymous functions as filter 
> conditions or as a select function, for example, or to use partial functions 
> to distinguish between different events.
> Furthermore, the Scala API could be designed to feel a bit more like a DSL:
> {code}
> begin "start" where _.id >= 42 -> "middle_1" as classOf[Subclass] || 
> "middle_2" where _.name equals "foobar" -> "end" where x => x.id <= x.volume
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-3414) Add Scala API for CEP's pattern definition

2017-03-15 Thread Ivan Mushketyk (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Mushketyk reassigned FLINK-3414:
-

Assignee: Dawid Wysakowicz  (was: Ivan Mushketyk)

> Add Scala API for CEP's pattern definition
> --
>
> Key: FLINK-3414
> URL: https://issues.apache.org/jira/browse/FLINK-3414
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Dawid Wysakowicz
>Priority: Minor
>
> Currently, the CEP library only supports a Java API to specify complex event 
> patterns. In order to make it a bit less verbose for Scala users, it would be 
> nice to also add a Scala API for the CEP library. 
> A Scala API would also allow to pass Scala's anonymous functions as filter 
> conditions or as a select function, for example, or to use partial functions 
> to distinguish between different events.
> Furthermore, the Scala API could be designed to feel a bit more like a DSL:
> {code}
> begin "start" where _.id >= 42 -> "middle_1" as classOf[Subclass] || 
> "middle_2" where _.name equals "foobar" -> "end" where x => x.id <= x.volume
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3523: [FLINK-5985] Report no task states for stateless tasks on...

2017-03-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3523
  
Can we slightly adapt the test to target more the typical use case:
  - Original job has some stateless ops (no uid), and some stateful ones 
(with uid)
  - Create a modified job that has the same stateful ones (same uids) but 
different stateless ones

Otherwise this looks good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926831#comment-15926831
 ] 

ASF GitHub Bot commented on FLINK-5985:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3523
  
Can we slightly adapt the test to target more the typical use case:
  - Original job has some stateless ops (no uid), and some stateful ones 
(with uid)
  - Create a modified job that has the same stateful ones (same uids) but 
different stateless ones

Otherwise this looks good.


> Flink treats every task as stateful (making topology changes impossible)
> 
>
> Key: FLINK-5985
> URL: https://issues.apache.org/jira/browse/FLINK-5985
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Priority: Critical
>
> It seems  that Flink treats every Task as stateful so changing the topology 
> is not possible without setting uid on every single operator.
> If the topology has an iteration this is virtually impossible (or at least 
> gets super hacky)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5048) Kafka Consumer (0.9/0.10) threading model leads problematic cancellation behavior

2017-03-15 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926820#comment-15926820
 ] 

Robert Metzger commented on FLINK-5048:
---

I'm not aware of any user on 1.1 affected by this. The change is quite involved 
and could potentially break existing code. Therefore, I would not backport that 
change.

> Kafka Consumer (0.9/0.10) threading model leads problematic cancellation 
> behavior
> -
>
> Key: FLINK-5048
> URL: https://issues.apache.org/jira/browse/FLINK-5048
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.3
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.2.0, 1.1.5
>
>
> The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that 
> operates the KafkaConsumer. That thread is shielded from interrupts, because 
> the Kafka Consumer has not been handling thread interrupts well.
> Since that thread is also the thread that emits records, it may block in the 
> network stack (backpressure) or in chained operators. The later case leads to 
> situations where cancellations get very slow unless that thread would be 
> interrupted (which it cannot be).
> I propose to change the thread model as follows:
>   - A spawned consumer thread pull from the KafkaConsumer and pushes its 
> pulled batch of records into a blocking queue (size one)
>   - The main thread of the task will pull the record batches from the 
> blocking queue and emit the records.
> This allows actually for some additional I/O overlay while limiting the 
> additional memory consumption - only two batches are ever held, one being 
> fetched and one being emitted.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5962) Cancel checkpoint canceller tasks in CheckpointCoordinator

2017-03-15 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926812#comment-15926812
 ] 

Stephan Ewen commented on FLINK-5962:
-

Fixed in 1.2 via 9d59e008d8849cdfe2daf302e251454435bb997f

> Cancel checkpoint canceller tasks in CheckpointCoordinator
> --
>
> Key: FLINK-5962
> URL: https://issues.apache.org/jira/browse/FLINK-5962
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Stephan Ewen
>Priority: Critical
>
> The {{CheckpointCoordinator}} register a canceller task for each running 
> checkpoint. The canceller task's responsibility is to cancel a checkpoint if 
> it takes too long to complete. We should cancel this task as soon as the 
> checkpoint has been completed, because otherwise we will keep many canceller 
> tasks around. This can eventually lead to an OOM exception.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #:

2017-03-15 Thread StephanEwen
Github user StephanEwen commented on the pull request:


https://github.com/apache/flink/commit/0a501e9f7f56baba2905002b74746998458db007#commitcomment-21337504
  
In 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java:
In 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
 on line 84:
I think this results in unnecessary boxing of `time`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6044) TypeSerializerSerializationProxy.read() doesn't verify the read buffer length

2017-03-15 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926785#comment-15926785
 ] 

Stephan Ewen commented on FLINK-6044:
-

The {{read()}} / {{readFully()}} issue has occurred before, it will most likely 
occur again.

A simple thing we can do is "forbid" that method in the same way as String/Byte 
conversions without a charset.
Take a look at the {{CheckForbiddenMethodsUsage}} test in the manual tests. We 
can add such as test for the {{read()}} method.

We only need to remember executing the manual tests on every release.

> TypeSerializerSerializationProxy.read() doesn't verify the read buffer length
> -
>
> Key: FLINK-6044
> URL: https://issues.apache.org/jira/browse/FLINK-6044
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.2.0
> Environment: Ubuntu server 12.04.5 64 bit
> java version "1.8.0_111"
> Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)
>Reporter: Avihai Berkovitz
>Assignee: Stefan Richter
>Priority: Critical
> Fix For: 1.3.0
>
>
> The read() method of TypeSerializerSerializationProxy creates a buffers and 
> tries to fill it by calling the read() method of the given DataInputView, but 
> never checks the return value. The actual size read from the stream might be 
> smaller than the buffer size, and the rest of the buffer is filled with 
> zeroes, causing the deserialization to fail.
> It happened to me using a RocksDB state backend backed by S3. The setup was 
> done according to 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/aws.html#s3-simple-storage-service
>  and everything worked correctly until I upgraded to Flink 1.2.0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6044) TypeSerializerSerializationProxy.read() doesn't verify the read buffer length

2017-03-15 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926779#comment-15926779
 ] 

Stephan Ewen commented on FLINK-6044:
-

[~srichter] I think we should try and safeguard Flink against running into the 
same bug again.

> TypeSerializerSerializationProxy.read() doesn't verify the read buffer length
> -
>
> Key: FLINK-6044
> URL: https://issues.apache.org/jira/browse/FLINK-6044
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.2.0
> Environment: Ubuntu server 12.04.5 64 bit
> java version "1.8.0_111"
> Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)
>Reporter: Avihai Berkovitz
>Assignee: Stefan Richter
>Priority: Critical
> Fix For: 1.3.0
>
>
> The read() method of TypeSerializerSerializationProxy creates a buffers and 
> tries to fill it by calling the read() method of the given DataInputView, but 
> never checks the return value. The actual size read from the stream might be 
> smaller than the buffer size, and the rest of the buffer is filled with 
> zeroes, causing the deserialization to fail.
> It happened to me using a RocksDB state backend backed by S3. The setup was 
> done according to 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/aws.html#s3-simple-storage-service
>  and everything worked correctly until I upgraded to Flink 1.2.0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6018) Properly initialise StateDescriptor in AbstractStateBackend.getPartitionedState()

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926772#comment-15926772
 ] 

ASF GitHub Bot commented on FLINK-6018:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3534
  
This change may alter the format of savepoints, because it now forwards 
type-registrations to the Kryo serializer, which it did not do before.

Since we announced savepoint compatibility, we need to understand when this 
would happen. Is it something that could not really happen before (because 
whenever the method was called, the serializer was properly initialized 
outside), or did this actually occur in some cases?

@sunjincheng121 Can you share how you stumbled across this bug? Was it a 
code cleanup, or a bug you encountered while running Flink?


> Properly initialise StateDescriptor in 
> AbstractStateBackend.getPartitionedState()
> -
>
> Key: FLINK-6018
> URL: https://issues.apache.org/jira/browse/FLINK-6018
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.3.0
>
>
> The code snippet currently in the `AbstractKeyedStateBackend # 
> getPartitionedState` method, as follows:
> {code}
> line 352: // TODO: This is wrong, it should throw an exception that the 
> initialization has not properly happened
> line 353: if (!stateDescriptor.isSerializerInitialized()) {
> line 354:stateDescriptor.initializeSerializerUnlessSet(new 
> ExecutionConfig());
> line 354 }
> {code}
> Method `isSerializerInitialized`:
> {code}
> public boolean isSerializerInitialized() {
>   return serializer != null;
>   }
> {code}
> Method `initializeSerializerUnlessSet`:
> {code}
> public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
>   if (serializer == null) { 
>   if (typeInfo != null) {
>   serializer = 
> typeInfo.createSerializer(executionConfig);
>   } else {
>   throw new IllegalStateException(
>   "Cannot initialize serializer 
> after TypeInformation was dropped during serialization");
>   }
>   }
>   }
> {code}
> that is, in the `initializeSerializerUnlessSet` method, The `serializer` has 
> been checked by `serializer == null`.So I hope this code has a little 
> improvement to the following:
> approach 1: 
> According to the `TODO` information  we throw an exception
> {code}
> if (!stateDescriptor.isSerializerInitialized()) {
>   throw new IllegalStateException("The serializer of the 
> descriptor has not been initialized!"); 
> }
> {code}
> approach 2:
> Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) 
> {` logic.
> {code}
> stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
> {code}
> Meanwhile, If we use the approach 2, I suggest that 
> `AbstractKeyedStateBackend` add a `private final ExecutionConfig 
> executionConfig` property. then we can change the code like this:
> {code}
> stateDescriptor.initializeSerializerUnlessSet(executionConfig);
> {code}
> Are the above suggestions reasonable for you? 
> Welcome anybody's feedback and corrections.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3534: [FLINK-6018][statebackend] Properly initialise StateDescr...

2017-03-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3534
  
This change may alter the format of savepoints, because it now forwards 
type-registrations to the Kryo serializer, which it did not do before.

Since we announced savepoint compatibility, we need to understand when this 
would happen. Is it something that could not really happen before (because 
whenever the method was called, the serializer was properly initialized 
outside), or did this actually occur in some cases?

@sunjincheng121 Can you share how you stumbled across this bug? Was it a 
code cleanup, or a bug you encountered while running Flink?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6018) Properly initialise StateDescriptor in AbstractStateBackend.getPartitionedState()

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926770#comment-15926770
 ] 

ASF GitHub Bot commented on FLINK-6018:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3534
  
There is also another method in `AbstractKeyedStateBackend`: 
`getOrCreateKeyedState` which does not silently use an empty `ExecutionConfig` 
but throws an exception. That method could also use the same execution config, 
now that it is available.


> Properly initialise StateDescriptor in 
> AbstractStateBackend.getPartitionedState()
> -
>
> Key: FLINK-6018
> URL: https://issues.apache.org/jira/browse/FLINK-6018
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.3.0
>
>
> The code snippet currently in the `AbstractKeyedStateBackend # 
> getPartitionedState` method, as follows:
> {code}
> line 352: // TODO: This is wrong, it should throw an exception that the 
> initialization has not properly happened
> line 353: if (!stateDescriptor.isSerializerInitialized()) {
> line 354:stateDescriptor.initializeSerializerUnlessSet(new 
> ExecutionConfig());
> line 354 }
> {code}
> Method `isSerializerInitialized`:
> {code}
> public boolean isSerializerInitialized() {
>   return serializer != null;
>   }
> {code}
> Method `initializeSerializerUnlessSet`:
> {code}
> public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
>   if (serializer == null) { 
>   if (typeInfo != null) {
>   serializer = 
> typeInfo.createSerializer(executionConfig);
>   } else {
>   throw new IllegalStateException(
>   "Cannot initialize serializer 
> after TypeInformation was dropped during serialization");
>   }
>   }
>   }
> {code}
> that is, in the `initializeSerializerUnlessSet` method, The `serializer` has 
> been checked by `serializer == null`.So I hope this code has a little 
> improvement to the following:
> approach 1: 
> According to the `TODO` information  we throw an exception
> {code}
> if (!stateDescriptor.isSerializerInitialized()) {
>   throw new IllegalStateException("The serializer of the 
> descriptor has not been initialized!"); 
> }
> {code}
> approach 2:
> Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) 
> {` logic.
> {code}
> stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
> {code}
> Meanwhile, If we use the approach 2, I suggest that 
> `AbstractKeyedStateBackend` add a `private final ExecutionConfig 
> executionConfig` property. then we can change the code like this:
> {code}
> stateDescriptor.initializeSerializerUnlessSet(executionConfig);
> {code}
> Are the above suggestions reasonable for you? 
> Welcome anybody's feedback and corrections.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3534: [FLINK-6018][statebackend] Properly initialise StateDescr...

2017-03-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3534
  
There is also another method in `AbstractKeyedStateBackend`: 
`getOrCreateKeyedState` which does not silently use an empty `ExecutionConfig` 
but throws an exception. That method could also use the same execution config, 
now that it is available.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6018) Properly initialise StateDescriptor in AbstractStateBackend.getPartitionedState()

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926768#comment-15926768
 ] 

ASF GitHub Bot commented on FLINK-6018:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3534
  
I think this could have really used a test. It is tricky bug that will 
easily be re-introduced without a test that guards the fix.


> Properly initialise StateDescriptor in 
> AbstractStateBackend.getPartitionedState()
> -
>
> Key: FLINK-6018
> URL: https://issues.apache.org/jira/browse/FLINK-6018
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.3.0
>
>
> The code snippet currently in the `AbstractKeyedStateBackend # 
> getPartitionedState` method, as follows:
> {code}
> line 352: // TODO: This is wrong, it should throw an exception that the 
> initialization has not properly happened
> line 353: if (!stateDescriptor.isSerializerInitialized()) {
> line 354:stateDescriptor.initializeSerializerUnlessSet(new 
> ExecutionConfig());
> line 354 }
> {code}
> Method `isSerializerInitialized`:
> {code}
> public boolean isSerializerInitialized() {
>   return serializer != null;
>   }
> {code}
> Method `initializeSerializerUnlessSet`:
> {code}
> public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
>   if (serializer == null) { 
>   if (typeInfo != null) {
>   serializer = 
> typeInfo.createSerializer(executionConfig);
>   } else {
>   throw new IllegalStateException(
>   "Cannot initialize serializer 
> after TypeInformation was dropped during serialization");
>   }
>   }
>   }
> {code}
> that is, in the `initializeSerializerUnlessSet` method, The `serializer` has 
> been checked by `serializer == null`.So I hope this code has a little 
> improvement to the following:
> approach 1: 
> According to the `TODO` information  we throw an exception
> {code}
> if (!stateDescriptor.isSerializerInitialized()) {
>   throw new IllegalStateException("The serializer of the 
> descriptor has not been initialized!"); 
> }
> {code}
> approach 2:
> Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) 
> {` logic.
> {code}
> stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
> {code}
> Meanwhile, If we use the approach 2, I suggest that 
> `AbstractKeyedStateBackend` add a `private final ExecutionConfig 
> executionConfig` property. then we can change the code like this:
> {code}
> stateDescriptor.initializeSerializerUnlessSet(executionConfig);
> {code}
> Are the above suggestions reasonable for you? 
> Welcome anybody's feedback and corrections.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3534: [FLINK-6018][statebackend] Properly initialise StateDescr...

2017-03-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3534
  
I think this could have really used a test. It is tricky bug that will 
easily be re-introduced without a test that guards the fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2

2017-03-15 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926682#comment-15926682
 ] 

Till Rohrmann commented on FLINK-6063:
--

Hi Razvan,

sometimes it can take a little while until ZooKeeper notifies the TaskManagers 
about the new leader. In the meantime it tries to reconnect to the old master. 
But as soon as the new leader information is written to ZooKeeper and sent to 
the TaskManagers they should try to connect to the new leader. How long have 
you tried it out? Maybe it would be helpful to also see the JobManager log 
which became the new leader and more of the TaskManager logs.



> HA Configuration doesn't work with Flink 1.2
> 
>
> Key: FLINK-6063
> URL: https://issues.apache.org/jira/browse/FLINK-6063
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Razvan
>Priority: Critical
>
>  I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 
> TaskManagers. I start the Zookeeper Quorum from JobManager1, I get 
> confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink 
> job on this JobManager1.   
>  
>  The flink-conf.yaml is the same on all 5 VMs (also everything else related 
> to flink because I copied the folder across all VMs as suggested in 
> tutorials) this means jobmanager.rpc.address: points to JobManager1 
> everywhere.
> If I turn off the VM running JobManager1 I would expect Zookeeper to say one 
> of the remaining JobManagers is the leader and the TaskManagers should 
> reconnect to it. Instead a new leader is elected but the slaves keep 
> connecting to the old master
> 2017-03-15 10:28:28,655 INFO  org.apache.flink.core.fs.FileSystem 
>   - Ensuring all FileSystem streams are closed for Async 
> calls on Source: Custom Source -> Flat Map (1/1)
> 2017-03-15 10:28:38,534 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Disassociated] 
> 2017-03-15 10:28:46,606 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:28:52,431 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:02,435 WARN  akka.remote.ReliableDeliverySupervisor  
>   - Association with remote system 
> [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] 
> ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused 
> by: [Connection refused: /1.2.3.4:44779]
> 2017-03-15 10:29:10,489 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager 
> akka://flink/user/taskmanager disconnects from JobManager 
> akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its 
> leadership.
> 2017-03-15 10:29:10,490 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - Cancelling 
> all computations and discarding all cached data.
> 2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Source: Custom Source 
> -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
> 2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source -> Flat Map (1/1) 
> (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED.
> java.lang.Exception: TaskManager akka://flink/user/taskmanager 
> disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: 
> Old JobManager lost its leadership.
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
>

[jira] [Commented] (FLINK-5095) Add explicit notifyOfAddedX methods to MetricReporter interface

2017-03-15 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926684#comment-15926684
 ] 

Bowen Li commented on FLINK-5095:
-

Agree that MetricReporter interface is not extendable and scalable, and will 
definitely causing problems while Flink's metrics and monitoring systems grow.

I'll take some time to think about this as well as 
https://issues.apache.org/jira/browse/FLINK-6053

> Add explicit notifyOfAddedX methods to MetricReporter interface
> ---
>
> Key: FLINK-5095
> URL: https://issues.apache.org/jira/browse/FLINK-5095
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.3
>Reporter: Chesnay Schepler
>Priority: Minor
>
> I would like to start a discussion on the MetricReporter interface, 
> specifically the methods that notify a reporter of added or removed metrics.
> Currently, the methods are defined as follows:
> {code}
> void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
> void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup 
> group);
> {code}
> All metrics, regardless of their actual type, are passed to the reporter with 
> these methods.
> Since the different metric types have to be handled differently we thus force 
> every reporter to do something like this:
> {code}
> if (metric instanceof Counter) {
> Counter c = (Counter) metric;
>   // deal with counter
> } else if (metric instanceof Gauge) {
>   // deal with gauge
> } else if (metric instanceof Histogram) {
>   // deal with histogram
> } else if (metric instanceof Meter) {
>   // deal with meter
> } else {
>   // log something or throw an exception
> }
> {code}
> This has a few issues
> * the instanceof checks and castings are unnecessary overhead
> * it requires the implementer to be aware of every metric type
> * it encourages throwing an exception in the final else block
> We could remedy all of these by reworking the interface to contain explicit 
> add/remove methods for every metric type. This would however be a breaking 
> change and blow up the interface to 12 methods from the current 4. We could 
> also add a RichMetricReporter interface with these methods, which would 
> require relatively little changes but add additional complexity.
> I was wondering what other people think about this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6053) Gauge should only take subclasses of Number, rather than everything

2017-03-15 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926665#comment-15926665
 ] 

Bowen Li commented on FLINK-6053:
-

Cool. Adding a NumberGauge or NumericGauge was my first impression when running 
into this problem. But not being able to extend AbstractReporter might not be 
good. I need more time to think through solution. And I appreciate your input!

BTW, can you please add me as contributor to this project, so I can assign 
tickets to myself?

> Gauge should only take subclasses of Number, rather than everything
> --
>
> Key: FLINK-6053
> URL: https://issues.apache.org/jira/browse/FLINK-6053
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.2.0
>Reporter: Bowen Li
> Fix For: 2.0.0
>
>
> Currently, Flink's Gauge is defined as 
> ```java
> public interface Gauge extends Metric {
>   T getValue();
> }
> ```
> But it doesn't make sense to have Gauge take generic types other than Number. 
> And it blocks I from finishing FLINK-6013, because I cannot assume Gauge is 
> only about Number. So the class should be like
> ```java
> public interface Gauge extends Metric {
>   T getValue();
> }
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor

2017-03-15 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926662#comment-15926662
 ] 

Stephan Ewen commented on FLINK-5756:
-

Just validated that compactions actually help, but compactions are equally slow 
when many values are merged.

Its also the case that multiple gets to the same key take long, not only the 
first get.

> When there are many values under the same key in ListState, 
> RocksDBStateBackend performances poor
> -
>
> Key: FLINK-5756
> URL: https://issues.apache.org/jira/browse/FLINK-5756
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>
> When using RocksDB as the StateBackend, if there are many values under the 
> same key in ListState, the windowState.get() operator performances very poor. 
> I also the the RocksDB using version 4.11.2, the performance is also very 
> poor. The problem is likely to related to RocksDB itself's get() operator 
> after using merge(). The problem may influences the window operation's 
> performance when the size is very large using ListState. I try to merge 5 
> values under the same key in RocksDB, It costs 120 seconds to execute get() 
> operation.
> ///
> The flink's code is as follows:
> {code}
> class SEventSource extends RichSourceFunction [SEvent] {
>   private var count = 0L
>   private val alphabet = 
> "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
>   override def run(sourceContext: SourceContext[SEvent]): Unit = {
> while (true) {
>   for (i <- 0 until 5000) {
> sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
> count += 1L
>   }
>   Thread.sleep(1000)
> }
>   }
> }
> env.addSource(new SEventSource)
>   .assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[SEvent] {
> override def getCurrentWatermark: Watermark = {
>   new Watermark(System.currentTimeMillis())
> }
> override def extractTimestamp(t: SEvent, l: Long): Long = {
>   System.currentTimeMillis()
> }
>   })
>   .keyBy(0)
>   .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2)))
>   .apply(new WindowStatistic)
>   .map(x => (System.currentTimeMillis(), x))
>   .print()
> {code}
> 
> The RocksDB Test code:
> {code}
> val stringAppendOperator = new StringAppendOperator
> val options = new Options()
> options.setCompactionStyle(CompactionStyle.LEVEL)
>   .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
>   .setLevelCompactionDynamicLevelBytes(true)
>   .setIncreaseParallelism(4)
>   .setUseFsync(true)
>   .setMaxOpenFiles(-1)
>   .setCreateIfMissing(true)
>   .setMergeOperator(stringAppendOperator)
> val write_options = new WriteOptions
> write_options.setSync(false)
> val rocksDB = RocksDB.open(options, "/**/Data/")
> val key = "key"
> val value = 
> "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"
> val beginmerge = System.currentTimeMillis()
> for(i <- 0 to 5) {
>   rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes())
>   //rocksDB.put(key.getBytes, value.getBytes)
> }
> println("finish")
> val begin = System.currentTimeMillis()
> rocksDB.get(key.getBytes)
> val end = System.currentTimeMillis()
> println("merge cost:" + (begin - beginmerge))
> println("Time consuming:" + (end - begin))
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3483: [FLINK-5979] Backwards compatibility for HeapKeyed...

2017-03-15 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3483#discussion_r106138741
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.junit.Test;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.net.URL;
+import java.util.Collections;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest {
+
+   /**
+* This test takes a snapshot that was created with Flink 1.2 and tries 
to restore it in Flink 1.3 to check
+* the backwards compatibility of the serialization format of {@link 
StateTable}s.
+*/
+   @Test
+   public void testHeapKeyedStateBackend1_2To1_3BackwardsCompatibility() 
throws Exception {
+
+   ClassLoader cl = getClass().getClassLoader();
+   URL resource = 
cl.getResource("heap_keyed_statebackend_1_2.snapshot");
--- End diff --

Please also add the code (commented out) that is used to re-generate this 
snapshot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3483: [FLINK-5979] Backwards compatibility for HeapKeyed...

2017-03-15 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3483#discussion_r106137929
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
 ---
@@ -0,0 +1,1021 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.TreeSet;
+
+/**
+ * Basis for Flink's in-memory state tables with copy-on-write support. 
This map does not support null values for
--- End diff --

Is this the basis or the actual implementation? 😉 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3483: [FLINK-5979] Backwards compatibility for HeapKeyed...

2017-03-15 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3483#discussion_r106132349
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyContext.java 
---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyGroupRange;
+
+/**
+ * This interface is the current context of a keyed state. It provides 
information about the currently selected key in
+ * the context, the corresponding key-group, and other key and 
key-grouping related information.
+ * 
+ * The typical use case for this interface is providing a view on the 
current-key selection aspects of
+ * {@link org.apache.flink.runtime.state.KeyedStateBackend}.
+ */
+public interface KeyContext {
--- End diff --

We had a quick offline discussion about this because there already exists a 
`KeyContext` in Flink. Maybe you can rename this one to `InternalKeyContext` 
for now because it serves somewhat different purposes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-15 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106238200
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -19,33 +19,124 @@ package org.apache.flink.table.runtime.aggregate
 
 import java.util
 
-import org.apache.calcite.rel.`type`._
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rex.RexInputRef
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.rex.RexWindowBound
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.fun._
-import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT
+import org.apache.calcite.sql.`type`.SqlTypeName.BOOLEAN
+import org.apache.calcite.sql.`type`.SqlTypeName.DECIMAL
+import org.apache.calcite.sql.`type`.SqlTypeName.DOUBLE
+import org.apache.calcite.sql.`type`.SqlTypeName.FLOAT
+import org.apache.calcite.sql.`type`.SqlTypeName.INTEGER
+import org.apache.calcite.sql.`type`.SqlTypeName.SMALLINT
+import org.apache.calcite.sql.`type`.SqlTypeName.TINYINT
--- End diff --

after an inspection, I realized that the imports you mentioned are used. I 
think there is no unused import at this moment. Am I missing something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926658#comment-15926658
 ] 

ASF GitHub Bot commented on FLINK-5653:
---

Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106238200
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -19,33 +19,124 @@ package org.apache.flink.table.runtime.aggregate
 
 import java.util
 
-import org.apache.calcite.rel.`type`._
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rex.RexInputRef
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.rex.RexWindowBound
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.fun._
-import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT
+import org.apache.calcite.sql.`type`.SqlTypeName.BOOLEAN
+import org.apache.calcite.sql.`type`.SqlTypeName.DECIMAL
+import org.apache.calcite.sql.`type`.SqlTypeName.DOUBLE
+import org.apache.calcite.sql.`type`.SqlTypeName.FLOAT
+import org.apache.calcite.sql.`type`.SqlTypeName.INTEGER
+import org.apache.calcite.sql.`type`.SqlTypeName.SMALLINT
+import org.apache.calcite.sql.`type`.SqlTypeName.TINYINT
--- End diff --

after an inspection, I realized that the imports you mentioned are used. I 
think there is no unused import at this moment. Am I missing something?


> Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5653
> URL: https://issues.apache.org/jira/browse/FLINK-5653
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Stefano Bortoli
>
> The goal of this issue is to add support for OVER ROWS aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5656)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6065) Make TransportClient for ES5 pluggable

2017-03-15 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-6065:
-

 Summary: Make TransportClient for ES5 pluggable
 Key: FLINK-6065
 URL: https://issues.apache.org/jira/browse/FLINK-6065
 Project: Flink
  Issue Type: Improvement
  Components: ElasticSearch Connector, Streaming Connectors
Reporter: Robert Metzger


This JIRA is based on a user request: 
http://stackoverflow.com/questions/42807454/flink-xpack-elasticsearch-5-elasticsearchsecurityexception-missing-autentication?noredirect=1#comment72728053_42807454

Currently, in the {{Elasticsearch5ApiCallBridge}} the 
{{PreBuiltTransportClient}} is hardcoded. It would be nice to make this client 
pluggable to allow using other clients such as the 
{{PreBuiltXPackTransportClient}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6018) Properly initialise StateDescriptor in AbstractStateBackend.getPartitionedState()

2017-03-15 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-6018.
---
   Resolution: Fixed
Fix Version/s: 1.3.0

Fixed on master in
264f6df8e0c0fb2f9dfb0cd9beab9d380dc8e00c

> Properly initialise StateDescriptor in 
> AbstractStateBackend.getPartitionedState()
> -
>
> Key: FLINK-6018
> URL: https://issues.apache.org/jira/browse/FLINK-6018
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.3.0
>
>
> The code snippet currently in the `AbstractKeyedStateBackend # 
> getPartitionedState` method, as follows:
> {code}
> line 352: // TODO: This is wrong, it should throw an exception that the 
> initialization has not properly happened
> line 353: if (!stateDescriptor.isSerializerInitialized()) {
> line 354:stateDescriptor.initializeSerializerUnlessSet(new 
> ExecutionConfig());
> line 354 }
> {code}
> Method `isSerializerInitialized`:
> {code}
> public boolean isSerializerInitialized() {
>   return serializer != null;
>   }
> {code}
> Method `initializeSerializerUnlessSet`:
> {code}
> public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
>   if (serializer == null) { 
>   if (typeInfo != null) {
>   serializer = 
> typeInfo.createSerializer(executionConfig);
>   } else {
>   throw new IllegalStateException(
>   "Cannot initialize serializer 
> after TypeInformation was dropped during serialization");
>   }
>   }
>   }
> {code}
> that is, in the `initializeSerializerUnlessSet` method, The `serializer` has 
> been checked by `serializer == null`.So I hope this code has a little 
> improvement to the following:
> approach 1: 
> According to the `TODO` information  we throw an exception
> {code}
> if (!stateDescriptor.isSerializerInitialized()) {
>   throw new IllegalStateException("The serializer of the 
> descriptor has not been initialized!"); 
> }
> {code}
> approach 2:
> Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) 
> {` logic.
> {code}
> stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
> {code}
> Meanwhile, If we use the approach 2, I suggest that 
> `AbstractKeyedStateBackend` add a `private final ExecutionConfig 
> executionConfig` property. then we can change the code like this:
> {code}
> stateDescriptor.initializeSerializerUnlessSet(executionConfig);
> {code}
> Are the above suggestions reasonable for you? 
> Welcome anybody's feedback and corrections.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6018) Properly initialise StateDescriptor in AbstractStateBackend.getPartitionedState()

2017-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926635#comment-15926635
 ] 

ASF GitHub Bot commented on FLINK-6018:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3534
  
Thanks for your contribution!  I just merged.

Could you please close this PR?


> Properly initialise StateDescriptor in 
> AbstractStateBackend.getPartitionedState()
> -
>
> Key: FLINK-6018
> URL: https://issues.apache.org/jira/browse/FLINK-6018
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: sunjincheng
>Assignee: sunjincheng
> Fix For: 1.3.0
>
>
> The code snippet currently in the `AbstractKeyedStateBackend # 
> getPartitionedState` method, as follows:
> {code}
> line 352: // TODO: This is wrong, it should throw an exception that the 
> initialization has not properly happened
> line 353: if (!stateDescriptor.isSerializerInitialized()) {
> line 354:stateDescriptor.initializeSerializerUnlessSet(new 
> ExecutionConfig());
> line 354 }
> {code}
> Method `isSerializerInitialized`:
> {code}
> public boolean isSerializerInitialized() {
>   return serializer != null;
>   }
> {code}
> Method `initializeSerializerUnlessSet`:
> {code}
> public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
>   if (serializer == null) { 
>   if (typeInfo != null) {
>   serializer = 
> typeInfo.createSerializer(executionConfig);
>   } else {
>   throw new IllegalStateException(
>   "Cannot initialize serializer 
> after TypeInformation was dropped during serialization");
>   }
>   }
>   }
> {code}
> that is, in the `initializeSerializerUnlessSet` method, The `serializer` has 
> been checked by `serializer == null`.So I hope this code has a little 
> improvement to the following:
> approach 1: 
> According to the `TODO` information  we throw an exception
> {code}
> if (!stateDescriptor.isSerializerInitialized()) {
>   throw new IllegalStateException("The serializer of the 
> descriptor has not been initialized!"); 
> }
> {code}
> approach 2:
> Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) 
> {` logic.
> {code}
> stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
> {code}
> Meanwhile, If we use the approach 2, I suggest that 
> `AbstractKeyedStateBackend` add a `private final ExecutionConfig 
> executionConfig` property. then we can change the code like this:
> {code}
> stateDescriptor.initializeSerializerUnlessSet(executionConfig);
> {code}
> Are the above suggestions reasonable for you? 
> Welcome anybody's feedback and corrections.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3534: [FLINK-6018][statebackend] Properly initialise StateDescr...

2017-03-15 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3534
  
Thanks for your contribution! 😃 I just merged.

Could you please close this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3483: [FLINK-5979] Backwards compatibility for HeapKeyed...

2017-03-15 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3483#discussion_r106136980
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.junit.Test;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.net.URL;
+import java.util.Collections;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest {
--- End diff --

All other tests for backwards compatibility end in `*MigrationTest`, we 
should probably stick to this naming pattern.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3483: [FLINK-5979] Backwards compatibility for HeapKeyed...

2017-03-15 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3483#discussion_r106133243
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
 ---
@@ -47,77 +44,62 @@
/**
 * Creates a new key/value state for the given hash map of key/value 
pairs.
 *
-* @param backend The state backend backing that created this state.
 * @param stateDesc The state identifier for the state. This contains 
name
 *   and can create a default state value.
 * @param stateTable The state tab;e to use in this kev/value state. 
May contain initial state.
 */
protected AbstractHeapMergingState(
-   KeyedStateBackend backend,
SD stateDesc,
StateTable stateTable,
TypeSerializer keySerializer,
TypeSerializer namespaceSerializer) {
 
-   super(backend, stateDesc, stateTable, keySerializer, 
namespaceSerializer);
+   super(stateDesc, stateTable, keySerializer, 
namespaceSerializer);
+   this.mergeTransformation = new MergeTransformation();
}
 
+   private final MergeTransformation mergeTransformation;
--- End diff --

Curious placement of field. 😉 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3483: [FLINK-5979] Backwards compatibility for HeapKeyed...

2017-03-15 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3483#discussion_r106139194
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateEntry.java 
---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import java.util.Objects;
+
+/**
+ * One full entry in a state table. Consists of an immutable key (not 
null), an immutable namespace (not null), and
+ * a state that can be mutable and null.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of value
+ */
+public class StateEntry {
--- End diff --

Is this used anywhere other than as a base class for `StateTableEntry` in 
`CopyOnWriteStateTable`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   5   6   >