[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927536#comment-15927536 ] ASF GitHub Bot commented on FLINK-5653: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106343179 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -130,32 +142,76 @@ class DataStreamOverAggregate( val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] val result: DataStream[Row] = -// partitioned aggregation -if (partitionKeys.nonEmpty) { - val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( -namedAggregates, -inputType) + // partitioned aggregation + if (partitionKeys.nonEmpty) { +val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType) - inputDS +inputDS .keyBy(partitionKeys: _*) .process(processFunction) .returns(rowTypeInfo) .name(aggOpName) .asInstanceOf[DataStream[Row]] -} -// non-partitioned aggregation -else { - val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( -namedAggregates, -inputType, -false) - - inputDS - .process(processFunction).setParallelism(1).setMaxParallelism(1) -.returns(rowTypeInfo) -.name(aggOpName) -.asInstanceOf[DataStream[Row]] -} + } // non-partitioned aggregation + else { +val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType, + false) + +inputDS + .process(processFunction).setParallelism(1).setMaxParallelism(1) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + } +result + } + + def createBoundedAndCurrentRowProcessingTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val lowerbound: Int = AggregateUtil.getLowerBoundary( + logicWindow.constants, + overWindow.lowerBound, + getInput()) + +val result: DataStream[Row] = + // partitioned aggregation + if (partitionKeys.nonEmpty) { +val windowFunction = AggregateUtil.CreateBoundedProcessingOverWindowFunction( + namedAggregates, + inputType) +inputDS + .keyBy(partitionKeys: _*) + .countWindow(lowerbound,1) --- End diff -- `lowerbound -> (lowerbound+1)` > Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL > > > Key: FLINK-5653 > URL: https://issues.apache.org/jira/browse/FLINK-5653 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Stefano Bortoli > > The goal of this issue is to add support for OVER ROWS aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING > AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING > AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5656) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106343179 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -130,32 +142,76 @@ class DataStreamOverAggregate( val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] val result: DataStream[Row] = -// partitioned aggregation -if (partitionKeys.nonEmpty) { - val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( -namedAggregates, -inputType) + // partitioned aggregation + if (partitionKeys.nonEmpty) { +val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType) - inputDS +inputDS .keyBy(partitionKeys: _*) .process(processFunction) .returns(rowTypeInfo) .name(aggOpName) .asInstanceOf[DataStream[Row]] -} -// non-partitioned aggregation -else { - val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( -namedAggregates, -inputType, -false) - - inputDS - .process(processFunction).setParallelism(1).setMaxParallelism(1) -.returns(rowTypeInfo) -.name(aggOpName) -.asInstanceOf[DataStream[Row]] -} + } // non-partitioned aggregation + else { +val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType, + false) + +inputDS + .process(processFunction).setParallelism(1).setMaxParallelism(1) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + } +result + } + + def createBoundedAndCurrentRowProcessingTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val lowerbound: Int = AggregateUtil.getLowerBoundary( + logicWindow.constants, + overWindow.lowerBound, + getInput()) + +val result: DataStream[Row] = + // partitioned aggregation + if (partitionKeys.nonEmpty) { +val windowFunction = AggregateUtil.CreateBoundedProcessingOverWindowFunction( + namedAggregates, + inputType) +inputDS + .keyBy(partitionKeys: _*) + .countWindow(lowerbound,1) --- End diff -- `lowerbound -> (lowerbound+1)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927534#comment-15927534 ] ASF GitHub Bot commented on FLINK-5653: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106343147 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -130,32 +142,76 @@ class DataStreamOverAggregate( val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] val result: DataStream[Row] = -// partitioned aggregation -if (partitionKeys.nonEmpty) { - val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( -namedAggregates, -inputType) + // partitioned aggregation + if (partitionKeys.nonEmpty) { +val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType) - inputDS +inputDS .keyBy(partitionKeys: _*) .process(processFunction) .returns(rowTypeInfo) .name(aggOpName) .asInstanceOf[DataStream[Row]] -} -// non-partitioned aggregation -else { - val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( -namedAggregates, -inputType, -false) - - inputDS - .process(processFunction).setParallelism(1).setMaxParallelism(1) -.returns(rowTypeInfo) -.name(aggOpName) -.asInstanceOf[DataStream[Row]] -} + } // non-partitioned aggregation + else { +val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType, + false) + +inputDS + .process(processFunction).setParallelism(1).setMaxParallelism(1) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + } +result + } + + def createBoundedAndCurrentRowProcessingTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val lowerbound: Int = AggregateUtil.getLowerBoundary( + logicWindow.constants, + overWindow.lowerBound, + getInput()) + +val result: DataStream[Row] = + // partitioned aggregation + if (partitionKeys.nonEmpty) { +val windowFunction = AggregateUtil.CreateBoundedProcessingOverWindowFunction( + namedAggregates, + inputType) +inputDS + .keyBy(partitionKeys: _*) + .countWindow(lowerbound,1) + .apply(windowFunction) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + } // global non-partitioned aggregation + else { +val windowFunction = AggregateUtil.CreateBoundedProcessingOverGlobalWindowFunction( + namedAggregates, + inputType) + +inputDS + .countWindowAll(lowerbound,1) --- End diff -- `lowerbound -> (lowerbound+1)` > Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL > > > Key: FLINK-5653 > URL: https://issues.apache.org/jira/browse/FLINK-5653 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Stefano Bortoli > > The goal of this issue is to add support for OVER ROWS aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING > AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING > AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. >
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106343147 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -130,32 +142,76 @@ class DataStreamOverAggregate( val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] val result: DataStream[Row] = -// partitioned aggregation -if (partitionKeys.nonEmpty) { - val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( -namedAggregates, -inputType) + // partitioned aggregation + if (partitionKeys.nonEmpty) { +val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType) - inputDS +inputDS .keyBy(partitionKeys: _*) .process(processFunction) .returns(rowTypeInfo) .name(aggOpName) .asInstanceOf[DataStream[Row]] -} -// non-partitioned aggregation -else { - val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( -namedAggregates, -inputType, -false) - - inputDS - .process(processFunction).setParallelism(1).setMaxParallelism(1) -.returns(rowTypeInfo) -.name(aggOpName) -.asInstanceOf[DataStream[Row]] -} + } // non-partitioned aggregation + else { +val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType, + false) + +inputDS + .process(processFunction).setParallelism(1).setMaxParallelism(1) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + } +result + } + + def createBoundedAndCurrentRowProcessingTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val lowerbound: Int = AggregateUtil.getLowerBoundary( + logicWindow.constants, + overWindow.lowerBound, + getInput()) + +val result: DataStream[Row] = + // partitioned aggregation + if (partitionKeys.nonEmpty) { +val windowFunction = AggregateUtil.CreateBoundedProcessingOverWindowFunction( + namedAggregates, + inputType) +inputDS + .keyBy(partitionKeys: _*) + .countWindow(lowerbound,1) + .apply(windowFunction) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + } // global non-partitioned aggregation + else { +val windowFunction = AggregateUtil.CreateBoundedProcessingOverGlobalWindowFunction( + namedAggregates, + inputType) + +inputDS + .countWindowAll(lowerbound,1) --- End diff -- `lowerbound -> (lowerbound+1)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927530#comment-15927530 ] ASF GitHub Bot commented on FLINK-5653: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106342776 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.java.stream.sql; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.api.java.stream.utils.StreamTestData; +import org.apache.flink.table.api.scala.stream.utils.StreamITCase; +import org.apache.flink.types.Row; +import org.junit.Ignore; +import org.junit.Test; + +public class ProcTimeRowStreamAggregationSqlITCase extends StreamingMultipleProgramsTestBase { + + + @Test + public void testMaxAggregatation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + env.setParallelism(1); + + DataStream> ds = StreamTestData.get5TupleDataStream(env); + Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e"); + tableEnv.registerTable("MyTable", in); + + String sqlQuery = "SELECT a, MAX(c) OVER (PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable"; + Table result = tableEnv.sql(sqlQuery); + + DataStream resultSet = tableEnv.toDataStream(result, Row.class); + resultSet.addSink(new StreamITCase.StringSink()); + env.execute(); + + List expected = new ArrayList<>(); + expected.add("1,0"); + expected.add("2,1"); + expected.add("2,2"); + expected.add("3,3"); + expected.add("3,4"); + expected.add("3,5"); + expected.add("4,6"); + expected.add("4,7"); + expected.add("4,8"); + expected.add("4,9"); + expected.add("5,10"); + expected.add("5,11"); + expected.add("5,12"); + expected.add("5,14"); + expected.add("5,14"); + + StreamITCase.compareWithList(expected); + } + + @Test + public void testMinAggregatation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + env.setParallelism(1); + + DataStream > ds = StreamTestData.get5TupleDataStream(env); + Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e"); + tableEnv.registerTable("MyTable", in); + + String sqlQuery = "SELECT a, MIN(c) OVER (PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable"; + Table result = tableEnv.sql(sqlQuery); + + DataStream resultSet =
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106342776 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.java.stream.sql; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.api.java.stream.utils.StreamTestData; +import org.apache.flink.table.api.scala.stream.utils.StreamITCase; +import org.apache.flink.types.Row; +import org.junit.Ignore; +import org.junit.Test; + +public class ProcTimeRowStreamAggregationSqlITCase extends StreamingMultipleProgramsTestBase { + + + @Test + public void testMaxAggregatation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + env.setParallelism(1); + + DataStream> ds = StreamTestData.get5TupleDataStream(env); + Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e"); + tableEnv.registerTable("MyTable", in); + + String sqlQuery = "SELECT a, MAX(c) OVER (PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable"; + Table result = tableEnv.sql(sqlQuery); + + DataStream resultSet = tableEnv.toDataStream(result, Row.class); + resultSet.addSink(new StreamITCase.StringSink()); + env.execute(); + + List expected = new ArrayList<>(); + expected.add("1,0"); + expected.add("2,1"); + expected.add("2,2"); + expected.add("3,3"); + expected.add("3,4"); + expected.add("3,5"); + expected.add("4,6"); + expected.add("4,7"); + expected.add("4,8"); + expected.add("4,9"); + expected.add("5,10"); + expected.add("5,11"); + expected.add("5,12"); + expected.add("5,14"); + expected.add("5,14"); + + StreamITCase.compareWithList(expected); + } + + @Test + public void testMinAggregatation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + env.setParallelism(1); + + DataStream > ds = StreamTestData.get5TupleDataStream(env); + Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e"); + tableEnv.registerTable("MyTable", in); + + String sqlQuery = "SELECT a, MIN(c) OVER (PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable"; + Table result = tableEnv.sql(sqlQuery); + + DataStream resultSet = tableEnv.toDataStream(result, Row.class); + resultSet.addSink(new StreamITCase.StringSink()); + env.execute(); + + List expected = new ArrayList<>(); + expected.add("1,0"); +
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927520#comment-15927520 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3520 Hi @fhueske @godfreyhe, thanks for the review, i addressed most all your comments. @fhueske Except for letting `TableSourceScan` be aware of whether filter has been pushed down. I'm not sure to let the `TableSourceScan` has this kind of information, i'd prefer to let them stay within the all kinds of `TableSource`. One drawback to let `TableSourceScan` has such kind of information is when we do the `TableSourceScan` copy, we need to take care all these information, make sure they also be copied correctly. In the future, if we add more extension to `TableSource` like we can push part of query down, we will face this problem. Regarding to the interface of `FilterableTableSource`, i agree with you that the trait containing some logic is not friendly with java extensions. So i removed the default implementation of `isFilterPushedDown`, the inherited class should take care of this method. And regarding the `Tuple2` thing, how about we pass in a mutable java list, and let table source to *pick out* expression from it and return a copy of table source which contains these pushed down predicates. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3520: [FLINK-3849] [table] Add FilterableTableSource interface ...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3520 Hi @fhueske @godfreyhe, thanks for the review, i addressed most all your comments. @fhueske Except for letting `TableSourceScan` be aware of whether filter has been pushed down. I'm not sure to let the `TableSourceScan` has this kind of information, i'd prefer to let them stay within the all kinds of `TableSource`. One drawback to let `TableSourceScan` has such kind of information is when we do the `TableSourceScan` copy, we need to take care all these information, make sure they also be copied correctly. In the future, if we add more extension to `TableSource` like we can push part of query down, we will face this problem. Regarding to the interface of `FilterableTableSource`, i agree with you that the trait containing some logic is not friendly with java extensions. So i removed the default implementation of `isFilterPushedDown`, the inherited class should take care of this method. And regarding the `Tuple2` thing, how about we pass in a mutable java list, and let table source to *pick out* expression from it and return a copy of table source which contains these pushed down predicates. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5978) JM WebFrontend address ConfigOption is defined in ConfigConstants
[ https://issues.apache.org/jira/browse/FLINK-5978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927494#comment-15927494 ] ASF GitHub Bot commented on FLINK-5978: --- GitHub user zjureel opened a pull request: https://github.com/apache/flink/pull/3552 [FLINK-5978] Fix JM WebFrontend address ConfigOption is defined in Co… …nfigConstants Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3552.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3552 commit 6a611f5b1649af8f40c97014267a681cec1c0df9 Author: mengji.fyDate: 2017-03-16T04:53:03Z [FLINK-5978] Fix JM WebFrontend address ConfigOption is defined in ConfigConstants > JM WebFrontend address ConfigOption is defined in ConfigConstants > - > > Key: FLINK-5978 > URL: https://issues.apache.org/jira/browse/FLINK-5978 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.3.0 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.3.0 > > > The DEFAULT_JOB_MANAGER_WEB_FRONTEND_ADDRESS ConfigOption is defined in > ConfigConstants instead of JobManagerOptions. > Additionally, the name should not contain DEFAULT_ since it doesn't even > define a default value... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3552: [FLINK-5978] Fix JM WebFrontend address ConfigOpti...
GitHub user zjureel opened a pull request: https://github.com/apache/flink/pull/3552 [FLINK-5978] Fix JM WebFrontend address ConfigOption is defined in Co⦠â¦nfigConstants Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3552.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3552 commit 6a611f5b1649af8f40c97014267a681cec1c0df9 Author: mengji.fyDate: 2017-03-16T04:53:03Z [FLINK-5978] Fix JM WebFrontend address ConfigOption is defined in ConfigConstants --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-5701) FlinkKafkaProducer should check asyncException on checkpoints
[ https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-5701. Resolution: Fixed Additionally fixed for 1.1.5 with http://git-wip-us.apache.org/repos/asf/flink/commit/ 6662cc6. > FlinkKafkaProducer should check asyncException on checkpoints > - > > Key: FLINK-5701 > URL: https://issues.apache.org/jira/browse/FLINK-5701 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0, 1.1.5, 1.2.1 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html > The problem: > The producer holds a {{pendingRecords}} value that is incremented on each > invoke() and decremented on each callback, used to check if the producer > needs to sync on pending callbacks on checkpoints. > On each checkpoint, we should only consider the checkpoint succeeded iff > after flushing the {{pendingRecords == 0}} and {{asyncException == null}} > (currently, we’re only checking {{pendingRecords}}). > A quick fix for this is to check and rethrow async exceptions in the > {{snapshotState}} method both before and after flushing and > {{pendingRecords}} becomes 0. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5701) FlinkKafkaProducer should check asyncException on checkpoints
[ https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927479#comment-15927479 ] ASF GitHub Bot commented on FLINK-5701: --- Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/3549 > FlinkKafkaProducer should check asyncException on checkpoints > - > > Key: FLINK-5701 > URL: https://issues.apache.org/jira/browse/FLINK-5701 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0, 1.1.5, 1.2.1 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html > The problem: > The producer holds a {{pendingRecords}} value that is incremented on each > invoke() and decremented on each callback, used to check if the producer > needs to sync on pending callbacks on checkpoints. > On each checkpoint, we should only consider the checkpoint succeeded iff > after flushing the {{pendingRecords == 0}} and {{asyncException == null}} > (currently, we’re only checking {{pendingRecords}}). > A quick fix for this is to check and rethrow async exceptions in the > {{snapshotState}} method both before and after flushing and > {{pendingRecords}} becomes 0. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3549: [backport-1.1] [FLINK-5701] [kafka] FlinkKafkaProd...
Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/3549 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927464#comment-15927464 ] ASF GitHub Bot commented on FLINK-5653: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106336720 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -19,33 +19,124 @@ package org.apache.flink.table.runtime.aggregate import java.util -import org.apache.calcite.rel.`type`._ +import scala.collection.JavaConversions.asScalaBuffer +import scala.collection.mutable.ArrayBuffer + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rex.RexInputRef +import org.apache.calcite.rex.RexLiteral +import org.apache.calcite.rex.RexWindowBound +import org.apache.calcite.sql.SqlAggFunction +import org.apache.calcite.sql.SqlKind import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.calcite.sql.`type`.SqlTypeName._ -import org.apache.calcite.sql.fun._ -import org.apache.calcite.sql.{SqlAggFunction, SqlKind} -import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT +import org.apache.calcite.sql.`type`.SqlTypeName.BOOLEAN +import org.apache.calcite.sql.`type`.SqlTypeName.DECIMAL +import org.apache.calcite.sql.`type`.SqlTypeName.DOUBLE +import org.apache.calcite.sql.`type`.SqlTypeName.FLOAT +import org.apache.calcite.sql.`type`.SqlTypeName.INTEGER +import org.apache.calcite.sql.`type`.SqlTypeName.SMALLINT +import org.apache.calcite.sql.`type`.SqlTypeName.TINYINT --- End diff -- Yes, I had clone your code.May be in this case,we can use `._` , e.g.: `import org.apache.flink.table.functions.aggfunctions._` In this way, we can reduce 53 line to 1 line. What do you think? > Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL > > > Key: FLINK-5653 > URL: https://issues.apache.org/jira/browse/FLINK-5653 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Stefano Bortoli > > The goal of this issue is to add support for OVER ROWS aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING > AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING > AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5656) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106336720 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -19,33 +19,124 @@ package org.apache.flink.table.runtime.aggregate import java.util -import org.apache.calcite.rel.`type`._ +import scala.collection.JavaConversions.asScalaBuffer +import scala.collection.mutable.ArrayBuffer + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rex.RexInputRef +import org.apache.calcite.rex.RexLiteral +import org.apache.calcite.rex.RexWindowBound +import org.apache.calcite.sql.SqlAggFunction +import org.apache.calcite.sql.SqlKind import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.calcite.sql.`type`.SqlTypeName._ -import org.apache.calcite.sql.fun._ -import org.apache.calcite.sql.{SqlAggFunction, SqlKind} -import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT +import org.apache.calcite.sql.`type`.SqlTypeName.BOOLEAN +import org.apache.calcite.sql.`type`.SqlTypeName.DECIMAL +import org.apache.calcite.sql.`type`.SqlTypeName.DOUBLE +import org.apache.calcite.sql.`type`.SqlTypeName.FLOAT +import org.apache.calcite.sql.`type`.SqlTypeName.INTEGER +import org.apache.calcite.sql.`type`.SqlTypeName.SMALLINT +import org.apache.calcite.sql.`type`.SqlTypeName.TINYINT --- End diff -- Yes, I had clone your code.May be in this case,we can use `._` , e.g.: `import org.apache.flink.table.functions.aggfunctions._` In this way, we can reduce 53 line to 1 line. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927457#comment-15927457 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106335718 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.table.expressions.Expression + +/** + * Adds support for filtering push-down to a [[TableSource]]. + * A [[TableSource]] extending this interface is able to filter records before returning. + */ +trait FilterableTableSource[T] extends TableSource[T] { --- End diff -- Changed. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106335718 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.table.expressions.Expression + +/** + * Adds support for filtering push-down to a [[TableSource]]. + * A [[TableSource]] extending this interface is able to filter records before returning. + */ +trait FilterableTableSource[T] extends TableSource[T] { --- End diff -- Changed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927456#comment-15927456 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106335709 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef: RexInputRef):
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927453#comment-15927453 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106335660 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef: RexInputRef):
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106335709 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef: RexInputRef): Unit = +fields += inputRef.getIndex + + override def visitCall(call: RexCall): Unit = +call.operands.foreach(operand => operand.accept(this)) +} + +/** + * An RexVisitor to convert RexNode to
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106335660 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef: RexInputRef): Unit = +fields += inputRef.getIndex + + override def visitCall(call: RexCall): Unit = +call.operands.foreach(operand => operand.accept(this)) +} + +/** + * An RexVisitor to convert RexNode to
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927451#comment-15927451 ] ASF GitHub Bot commented on FLINK-3930: --- Github user WangTaoTheTonic commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r106335560 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CookieHandler.java --- @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.MessageToMessageDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class CookieHandler { + + public static class ClientCookieHandler extends ChannelInboundHandlerAdapter { + + private final Logger LOG = LoggerFactory.getLogger(ClientCookieHandler.class); + + private final String secureCookie; + + final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); + + public ClientCookieHandler(String secureCookie) { + this.secureCookie = secureCookie; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + LOG.debug("In channelActive method of ClientCookieHandler"); + + if(this.secureCookie != null && this.secureCookie.length() != 0) { + LOG.debug("In channelActive method of ClientCookieHandler -> sending secure cookie"); + final ByteBuf buffer = Unpooled.buffer(4 + this.secureCookie.getBytes(DEFAULT_CHARSET).length); + buffer.writeInt(secureCookie.getBytes(DEFAULT_CHARSET).length); + buffer.writeBytes(secureCookie.getBytes(DEFAULT_CHARSET)); + ctx.writeAndFlush(buffer); + } + } + } + + public static class ServerCookieDecoder extends MessageToMessageDecoder { + + private final String secureCookie; + + private final List channelList = new ArrayList<>(); + + private final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); + + private final Logger LOG = LoggerFactory.getLogger(ServerCookieDecoder.class); + + public ServerCookieDecoder(String secureCookie) { + this.secureCookie = secureCookie; + } + + /** +* Decode from one message to an other. This method will be called for each written message that can be handled +* by this encoder. +* +* @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageDecoder} belongs to +* @param msg the message to decode to an other one +* @param out the {@link List} to which decoded messages should be added +* @throws Exception is thrown if an error accour +*/ + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { + + LOG.debug("ChannelHandlerContext name: {}, channel: {}", ctx.name(), ctx.channel()); + + if(secureCookie == null || secureCookie.length() == 0) { + LOG.debug("Not validating secure cookie since the server configuration is not enabled to use cookie"); +
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user WangTaoTheTonic commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r106335560 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CookieHandler.java --- @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.MessageToMessageDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class CookieHandler { + + public static class ClientCookieHandler extends ChannelInboundHandlerAdapter { + + private final Logger LOG = LoggerFactory.getLogger(ClientCookieHandler.class); + + private final String secureCookie; + + final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); + + public ClientCookieHandler(String secureCookie) { + this.secureCookie = secureCookie; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + LOG.debug("In channelActive method of ClientCookieHandler"); + + if(this.secureCookie != null && this.secureCookie.length() != 0) { + LOG.debug("In channelActive method of ClientCookieHandler -> sending secure cookie"); + final ByteBuf buffer = Unpooled.buffer(4 + this.secureCookie.getBytes(DEFAULT_CHARSET).length); + buffer.writeInt(secureCookie.getBytes(DEFAULT_CHARSET).length); + buffer.writeBytes(secureCookie.getBytes(DEFAULT_CHARSET)); + ctx.writeAndFlush(buffer); + } + } + } + + public static class ServerCookieDecoder extends MessageToMessageDecoder { + + private final String secureCookie; + + private final List channelList = new ArrayList<>(); + + private final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); + + private final Logger LOG = LoggerFactory.getLogger(ServerCookieDecoder.class); + + public ServerCookieDecoder(String secureCookie) { + this.secureCookie = secureCookie; + } + + /** +* Decode from one message to an other. This method will be called for each written message that can be handled +* by this encoder. +* +* @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageDecoder} belongs to +* @param msg the message to decode to an other one +* @param out the {@link List} to which decoded messages should be added +* @throws Exception is thrown if an error accour +*/ + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { + + LOG.debug("ChannelHandlerContext name: {}, channel: {}", ctx.name(), ctx.channel()); + + if(secureCookie == null || secureCookie.length() == 0) { + LOG.debug("Not validating secure cookie since the server configuration is not enabled to use cookie"); + return; + } + + LOG.debug("Going to decode the secure cookie passed by the remote client"); + + if(channelList.contains(ctx.channel())) {
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user WangTaoTheTonic commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r106335331 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CookieHandler.java --- @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.MessageToMessageDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class CookieHandler { + + public static class ClientCookieHandler extends ChannelInboundHandlerAdapter { + + private final Logger LOG = LoggerFactory.getLogger(ClientCookieHandler.class); + + private final String secureCookie; + + final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); + + public ClientCookieHandler(String secureCookie) { + this.secureCookie = secureCookie; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + LOG.debug("In channelActive method of ClientCookieHandler"); + + if(this.secureCookie != null && this.secureCookie.length() != 0) { + LOG.debug("In channelActive method of ClientCookieHandler -> sending secure cookie"); + final ByteBuf buffer = Unpooled.buffer(4 + this.secureCookie.getBytes(DEFAULT_CHARSET).length); + buffer.writeInt(secureCookie.getBytes(DEFAULT_CHARSET).length); + buffer.writeBytes(secureCookie.getBytes(DEFAULT_CHARSET)); + ctx.writeAndFlush(buffer); + } + } + } + + public static class ServerCookieDecoder extends MessageToMessageDecoder { + + private final String secureCookie; + + private final List channelList = new ArrayList<>(); --- End diff -- Is it better to use `Set` instead of a `List` here? As it is mainly used for lookup. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927446#comment-15927446 ] ASF GitHub Bot commented on FLINK-3930: --- Github user WangTaoTheTonic commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r106335331 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CookieHandler.java --- @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.MessageToMessageDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class CookieHandler { + + public static class ClientCookieHandler extends ChannelInboundHandlerAdapter { + + private final Logger LOG = LoggerFactory.getLogger(ClientCookieHandler.class); + + private final String secureCookie; + + final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); + + public ClientCookieHandler(String secureCookie) { + this.secureCookie = secureCookie; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + LOG.debug("In channelActive method of ClientCookieHandler"); + + if(this.secureCookie != null && this.secureCookie.length() != 0) { + LOG.debug("In channelActive method of ClientCookieHandler -> sending secure cookie"); + final ByteBuf buffer = Unpooled.buffer(4 + this.secureCookie.getBytes(DEFAULT_CHARSET).length); + buffer.writeInt(secureCookie.getBytes(DEFAULT_CHARSET).length); + buffer.writeBytes(secureCookie.getBytes(DEFAULT_CHARSET)); + ctx.writeAndFlush(buffer); + } + } + } + + public static class ServerCookieDecoder extends MessageToMessageDecoder { + + private final String secureCookie; + + private final List channelList = new ArrayList<>(); --- End diff -- Is it better to use `Set` instead of a `List` here? As it is mainly used for lookup. > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA
[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927442#comment-15927442 ] ASF GitHub Bot commented on FLINK-5653: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106335062 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala --- @@ -18,8 +18,6 @@ package org.apache.flink.table.plan.logical.rel -import java.util --- End diff -- Your current package is `org.apache.flink.table.plan.logical.rel`, in this package there is a` org.apache.flink.table.plan.logical.rel.util`, this and `util .List` package conflicts. So you can try the following 2 way solution: solution1: ``` import java.util.List groupSets: List[ImmutableBitSet], aggCalls: List[AggregateCall]) ``` solution2: ``` import java.{util => JUtil} groupSets: JUtil.List[ImmutableBitSet], aggCalls: JUtil.List[AggregateCall]) ``` > Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL > > > Key: FLINK-5653 > URL: https://issues.apache.org/jira/browse/FLINK-5653 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Stefano Bortoli > > The goal of this issue is to add support for OVER ROWS aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING > AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING > AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5656) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106335062 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala --- @@ -18,8 +18,6 @@ package org.apache.flink.table.plan.logical.rel -import java.util --- End diff -- Your current package is `org.apache.flink.table.plan.logical.rel`, in this package there is a` org.apache.flink.table.plan.logical.rel.util`, this and `util .List` package conflicts. So you can try the following 2 way solution: solution1: ``` import java.util.List groupSets: List[ImmutableBitSet], aggCalls: List[AggregateCall]) ``` solution2: ``` import java.{util => JUtil} groupSets: JUtil.List[ImmutableBitSet], aggCalls: JUtil.List[AggregateCall]) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5701) FlinkKafkaProducer should check asyncException on checkpoints
[ https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927441#comment-15927441 ] ASF GitHub Bot commented on FLINK-5701: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3549 Thanks for the review! Travis is green (local branch), with only test timeouts: https://travis-ci.org/tzulitai/flink Merging this .. > FlinkKafkaProducer should check asyncException on checkpoints > - > > Key: FLINK-5701 > URL: https://issues.apache.org/jira/browse/FLINK-5701 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0, 1.1.5, 1.2.1 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html > The problem: > The producer holds a {{pendingRecords}} value that is incremented on each > invoke() and decremented on each callback, used to check if the producer > needs to sync on pending callbacks on checkpoints. > On each checkpoint, we should only consider the checkpoint succeeded iff > after flushing the {{pendingRecords == 0}} and {{asyncException == null}} > (currently, we’re only checking {{pendingRecords}}). > A quick fix for this is to check and rethrow async exceptions in the > {{snapshotState}} method both before and after flushing and > {{pendingRecords}} becomes 0. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3549: [backport-1.1] [FLINK-5701] [kafka] FlinkKafkaProducer sh...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3549 Thanks for the review! Travis is green (local branch), with only test timeouts: https://travis-ci.org/tzulitai/flink Merging this .. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-5048) Kafka Consumer (0.9/0.10) threading model leads problematic cancellation behavior
[ https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-5048. Resolution: Fixed Fix Version/s: (was: 1.1.5) > Kafka Consumer (0.9/0.10) threading model leads problematic cancellation > behavior > - > > Key: FLINK-5048 > URL: https://issues.apache.org/jira/browse/FLINK-5048 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.3 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.2.0 > > > The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that > operates the KafkaConsumer. That thread is shielded from interrupts, because > the Kafka Consumer has not been handling thread interrupts well. > Since that thread is also the thread that emits records, it may block in the > network stack (backpressure) or in chained operators. The later case leads to > situations where cancellations get very slow unless that thread would be > interrupted (which it cannot be). > I propose to change the thread model as follows: > - A spawned consumer thread pull from the KafkaConsumer and pushes its > pulled batch of records into a blocking queue (size one) > - The main thread of the task will pull the record batches from the > blocking queue and emit the records. > This allows actually for some additional I/O overlay while limiting the > additional memory consumption - only two batches are ever held, one being > fetched and one being emitted. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5048) Kafka Consumer (0.9/0.10) threading model leads problematic cancellation behavior
[ https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927427#comment-15927427 ] Tzu-Li (Gordon) Tai edited comment on FLINK-5048 at 3/16/17 3:43 AM: - Agreed, let's mark this issue as resolved and remove {{1.1.5}} from the fix versions then. was (Author: tzulitai): Agreed, let's resolve this issue and remove {{1.1.5}} from the fix versions then. > Kafka Consumer (0.9/0.10) threading model leads problematic cancellation > behavior > - > > Key: FLINK-5048 > URL: https://issues.apache.org/jira/browse/FLINK-5048 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.3 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.2.0, 1.1.5 > > > The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that > operates the KafkaConsumer. That thread is shielded from interrupts, because > the Kafka Consumer has not been handling thread interrupts well. > Since that thread is also the thread that emits records, it may block in the > network stack (backpressure) or in chained operators. The later case leads to > situations where cancellations get very slow unless that thread would be > interrupted (which it cannot be). > I propose to change the thread model as follows: > - A spawned consumer thread pull from the KafkaConsumer and pushes its > pulled batch of records into a blocking queue (size one) > - The main thread of the task will pull the record batches from the > blocking queue and emit the records. > This allows actually for some additional I/O overlay while limiting the > additional memory consumption - only two batches are ever held, one being > fetched and one being emitted. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5048) Kafka Consumer (0.9/0.10) threading model leads problematic cancellation behavior
[ https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927427#comment-15927427 ] Tzu-Li (Gordon) Tai commented on FLINK-5048: Agreed, let's resolve this issue and remove {{1.1.5}} from the fix versions then. > Kafka Consumer (0.9/0.10) threading model leads problematic cancellation > behavior > - > > Key: FLINK-5048 > URL: https://issues.apache.org/jira/browse/FLINK-5048 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.3 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.2.0, 1.1.5 > > > The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that > operates the KafkaConsumer. That thread is shielded from interrupts, because > the Kafka Consumer has not been handling thread interrupts well. > Since that thread is also the thread that emits records, it may block in the > network stack (backpressure) or in chained operators. The later case leads to > situations where cancellations get very slow unless that thread would be > interrupted (which it cannot be). > I propose to change the thread model as follows: > - A spawned consumer thread pull from the KafkaConsumer and pushes its > pulled batch of records into a blocking queue (size one) > - The main thread of the task will pull the record batches from the > blocking queue and emit the records. > This allows actually for some additional I/O overlay while limiting the > additional memory consumption - only two batches are ever held, one being > fetched and one being emitted. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5978) JM WebFrontend address ConfigOption is defined in ConfigConstants
[ https://issues.apache.org/jira/browse/FLINK-5978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927411#comment-15927411 ] fang yong commented on FLINK-5978: -- Hi Chesnay Schepler, glad to see an easy issue while I'm new, could you please assign it to me? > JM WebFrontend address ConfigOption is defined in ConfigConstants > - > > Key: FLINK-5978 > URL: https://issues.apache.org/jira/browse/FLINK-5978 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.3.0 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.3.0 > > > The DEFAULT_JOB_MANAGER_WEB_FRONTEND_ADDRESS ConfigOption is defined in > ConfigConstants instead of JobManagerOptions. > Additionally, the name should not contain DEFAULT_ since it doesn't even > define a default value... -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106329219 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.dataSet + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc} +import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoBatchTableSourceScanRule extends RelOptRule( + operand(classOf[DataSetCalc], +operand(classOf[BatchTableSourceScan], none)), + "PushFilterIntoBatchTableSourceScanRule") + with PushFilterIntoTableSourceScanRuleBase { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc] +val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource[_] => +calc.getProgram.getCondition != null --- End diff -- Will add --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106329225 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan} +import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoStreamTableSourceScanRule extends RelOptRule( + operand(classOf[DataStreamCalc], +operand(classOf[StreamTableSourceScan], none)), + "PushFilterIntoStreamTableSourceScanRule") + with PushFilterIntoTableSourceScanRuleBase { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] +val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource[_] => +calc.getProgram.getCondition != null --- End diff -- Will add --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927379#comment-15927379 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106329219 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.dataSet + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc} +import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoBatchTableSourceScanRule extends RelOptRule( + operand(classOf[DataSetCalc], +operand(classOf[BatchTableSourceScan], none)), + "PushFilterIntoBatchTableSourceScanRule") + with PushFilterIntoTableSourceScanRuleBase { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc] +val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource[_] => +calc.getProgram.getCondition != null --- End diff -- Will add > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106328751 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala --- @@ -25,6 +25,7 @@ import org.apache.flink.table.sources.TableSource /** Table which defines an external table via a [[TableSource]] */ class TableSourceTable[T]( val tableSource: TableSource[T], +val tableEnv: TableEnvironment, --- End diff -- Yes, you are right, especially that UDF is currently registered as objects but not classes, it's really impossible to let TableSource supporting this. I will remove this filed and only use built-in functions when extracting expression form RexProgram. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927380#comment-15927380 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106329225 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan} +import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoStreamTableSourceScanRule extends RelOptRule( + operand(classOf[DataStreamCalc], +operand(classOf[StreamTableSourceScan], none)), + "PushFilterIntoStreamTableSourceScanRule") + with PushFilterIntoTableSourceScanRuleBase { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] +val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource[_] => +calc.getProgram.getCondition != null --- End diff -- Will add > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106329099 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef: RexInputRef): Unit = +fields += inputRef.getIndex + + override def visitCall(call: RexCall): Unit = +call.operands.foreach(operand => operand.accept(this)) +} + +/** + * An RexVisitor to convert RexNode to
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927377#comment-15927377 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106329079 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.RelOptRuleCall +import org.apache.calcite.rel.core.Calc +import org.apache.calcite.rex.RexProgram +import org.apache.flink.table.plan.nodes.TableSourceScan +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.util.RexProgramExtractor +import org.apache.flink.table.sources.FilterableTableSource + +trait PushFilterIntoTableSourceScanRuleBase { + + private[flink] def pushFilterIntoScan( + call: RelOptRuleCall, + calc: Calc, + scan: TableSourceScan, + tableSourceTable: TableSourceTable[_], + filterableSource: FilterableTableSource[_], + description: String): Unit = { + +if (filterableSource.isFilterPushedDown) { --- End diff -- Will change this. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927378#comment-15927378 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106329099 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef: RexInputRef):
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106329079 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.RelOptRuleCall +import org.apache.calcite.rel.core.Calc +import org.apache.calcite.rex.RexProgram +import org.apache.flink.table.plan.nodes.TableSourceScan +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.util.RexProgramExtractor +import org.apache.flink.table.sources.FilterableTableSource + +trait PushFilterIntoTableSourceScanRuleBase { + + private[flink] def pushFilterIntoScan( + call: RelOptRuleCall, + calc: Calc, + scan: TableSourceScan, + tableSourceTable: TableSourceTable[_], + filterableSource: FilterableTableSource[_], + description: String): Unit = { + +if (filterableSource.isFilterPushedDown) { --- End diff -- Will change this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5650) Flink-python tests executing cost too long time
[ https://issues.apache.org/jira/browse/FLINK-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-5650: - Labels: osx (was: ) > Flink-python tests executing cost too long time > --- > > Key: FLINK-5650 > URL: https://issues.apache.org/jira/browse/FLINK-5650 > Project: Flink > Issue Type: Bug > Components: Python API, Tests >Affects Versions: 1.2.0 >Reporter: shijinkui >Priority: Critical > Labels: osx > Fix For: 1.2.1 > > > When execute `mvn clean test` in flink-python, it will wait more than half > hour after the console output below: > --- > T E S T S > --- > Running org.apache.flink.python.api.PythonPlanBinderTest > log4j:WARN No appenders could be found for logger > (org.apache.flink.python.api.PythonPlanBinderTest). > log4j:WARN Please initialize the log4j system properly. > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more > info. > The stack below: > "main" prio=5 tid=0x7f8d7780b800 nid=0x1c03 waiting on condition > [0x79fd8000] >java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(Native Method) > at > org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.startPython(PythonPlanStreamer.java:70) > at > org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.open(PythonPlanStreamer.java:50) > at > org.apache.flink.python.api.PythonPlanBinder.startPython(PythonPlanBinder.java:211) > at > org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:141) > at > org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:114) > at > org.apache.flink.python.api.PythonPlanBinderTest.testProgram(PythonPlanBinderTest.java:83) > at > org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:174) > this is the jstack: > https://gist.github.com/shijinkui/af47e8bc6c9f748336bf52efd3df94b0 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927371#comment-15927371 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106328751 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala --- @@ -25,6 +25,7 @@ import org.apache.flink.table.sources.TableSource /** Table which defines an external table via a [[TableSource]] */ class TableSourceTable[T]( val tableSource: TableSource[T], +val tableEnv: TableEnvironment, --- End diff -- Yes, you are right, especially that UDF is currently registered as objects but not classes, it's really impossible to let TableSource supporting this. I will remove this filed and only use built-in functions when extracting expression form RexProgram. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5981) SSL version and ciper suites cannot be constrained as configured
[ https://issues.apache.org/jira/browse/FLINK-5981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927368#comment-15927368 ] ASF GitHub Bot commented on FLINK-5981: --- Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3486 Since not merged, I've turned them around. Sorry for the carelessness :( > SSL version and ciper suites cannot be constrained as configured > > > Key: FLINK-5981 > URL: https://issues.apache.org/jira/browse/FLINK-5981 > Project: Flink > Issue Type: Bug > Components: Security >Reporter: Tao Wang >Assignee: Tao Wang > > I configured ssl and start flink job, but found configured properties cannot > apply properly: > akka port: only ciper suites apply right, ssl version not > blob server/netty server: both ssl version and ciper suites are not like what > I configured > I've found out the reason why: > http://stackoverflow.com/questions/11504173/sslcontext-initialization (for > blob server and netty server) > https://groups.google.com/forum/#!topic/akka-user/JH6bGnWE8kY(for akka ssl > version, it's fixed in akka 2.4:https://github.com/akka/akka/pull/21078) > I'll fix the issue on blob server and netty server, and it seems like only > upgrade for akka can solve issue in akka side(we'll consider later as upgrade > is not a small action). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3486: [FLINK-5981][SECURITY]make ssl version and cipher suites ...
Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3486 Since not merged, I've turned them around. Sorry for the carelessness :( --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5498) Add support for left/right outer joins with non-equality predicates (and 1+ equality predicates)
[ https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927355#comment-15927355 ] ASF GitHub Bot commented on FLINK-5498: --- Github user lincoln-lil commented on the issue: https://github.com/apache/flink/pull/3379 Part of integration tests will occasionally fail when submit this pr, and I configured Travis CI for my repository then got a whole pass. see https://travis-ci.org/lincoln-lil/flink/builds/208811166 I've rebased master and updated this pr , and finished a successful local verify. ``` [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 36:39 min [INFO] Finished at: 2017-03-16T10:12:34+08:00 [INFO] Final Memory: 231M/1187M ``` It'll be appreciated if someone can review this pr. > Add support for left/right outer joins with non-equality predicates (and 1+ > equality predicates) > > > Key: FLINK-5498 > URL: https://issues.apache.org/jira/browse/FLINK-5498 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > I found the expected result of a unit test case incorrect compare to that in > a RDMBS, > see > flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala > {code:title=JoinITCase.scala} > def testRightJoinWithNotOnlyEquiJoin(): Unit = { > ... > val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, > 'c) > val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, > 'f, 'g, 'h) > val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) > > val expected = "Hello world,BCD\n" > val results = joinT.toDataSet[Row].collect() > TestBaseUtils.compareResultAsText(results.asJava, expected) > } > {code} > Then I took some time to learn about the ‘outer join’ in relational > databases, the right result of above case should be(tested in SQL Server and > MySQL, the results are same): > {code} > > select c, g from tuple3 right outer join tuple5 on a=f and bcg > > NULL Hallo > NULL Hallo Welt > NULL Hallo Welt wie > NULL Hallo Welt wie gehts? > NULL ABC > Hello world BCD > NULL CDE > NULL DEF > NULL EFG > NULL FGH > NULL GHI > NULL HIJ > NULL IJK > NULL JKL > NULL KLM > {code} > the join condition {{rightOuterJoin('a === 'd && 'b < 'h)}} is not equivalent > to {{rightOuterJoin('a === 'd).where('b < 'h)}}. > The problem is rooted in the code-generated {{JoinFunction}} (see > {{DataSetJoin.translateToPlan()}}, line 188). If the join condition does not > match, we must emit the outer row padded with nulls instead of returning from > the function without emitting anything. > The code-generated {{JoinFunction}} does also include equality predicates. > These should be removed before generating the code, e.g., in > {{DataSetJoinRule}} when generating the {{DataSetJoin}} with help of > {{JoinInfo.getRemaining()}}. > More details: https://goo.gl/ngekca -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3379: [FLINK-5498] [table] Add support for left/right outer joi...
Github user lincoln-lil commented on the issue: https://github.com/apache/flink/pull/3379 Part of integration tests will occasionally fail when submit this pr, and I configured Travis CI for my repository then got a whole pass. see https://travis-ci.org/lincoln-lil/flink/builds/208811166 I've rebased master and updated this pr , and finished a successful local verify. ``` [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 36:39 min [INFO] Finished at: 2017-03-16T10:12:34+08:00 [INFO] Final Memory: 231M/1187M ``` It'll be appreciated if someone can review this pr. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927353#comment-15927353 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106327707 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.RelOptRuleCall +import org.apache.calcite.rel.core.Calc +import org.apache.calcite.rex.RexProgram +import org.apache.flink.table.plan.nodes.TableSourceScan +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.util.RexProgramExtractor +import org.apache.flink.table.sources.FilterableTableSource + +trait PushFilterIntoTableSourceScanRuleBase { + + private[flink] def pushFilterIntoScan( + call: RelOptRuleCall, + calc: Calc, + scan: TableSourceScan, + tableSourceTable: TableSourceTable[_], + filterableSource: FilterableTableSource[_], + description: String): Unit = { + +if (filterableSource.isFilterPushedDown) { + // The rule can get triggered again due to the transformed "scan => filter" + // sequence created by the earlier execution of this rule when we could not + // push all the conditions into the scan + return +} + +val program = calc.getProgram +val (predicates, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( +program, +call.builder().getRexBuilder, +tableSourceTable.tableEnv.getFunctionCatalog) +if (predicates.isEmpty) { + // no condition can be translated to expression + return +} + +val (newTableSource, remainingPredicates) = filterableSource.applyPredicate(predicates) +// trying to apply filter push down, set the flag to true no matter whether +// we actually push any filters down. +newTableSource.setFilterPushedDown(true) + +// check whether framework still need to do a filter +val relBuilder = call.builder() +val remainingCondition = { + if (remainingPredicates.nonEmpty || unconvertedRexNodes.nonEmpty) { +relBuilder.push(scan) +(remainingPredicates.map(expr => expr.toRexNode(relBuilder)) ++ unconvertedRexNodes) +.reduce((l, r) => relBuilder.and(l, r)) + } else { +null + } +} + +// check whether we still need a RexProgram. An RexProgram is needed when either +// projection or filter exists. +val newScan = scan.copy(scan.getTraitSet, newTableSource) +val newRexProgram = { + if (remainingCondition != null || program.getProjectList.size() > 0) { --- End diff -- Thanks for the tips, will change this. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106327707 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.RelOptRuleCall +import org.apache.calcite.rel.core.Calc +import org.apache.calcite.rex.RexProgram +import org.apache.flink.table.plan.nodes.TableSourceScan +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.util.RexProgramExtractor +import org.apache.flink.table.sources.FilterableTableSource + +trait PushFilterIntoTableSourceScanRuleBase { + + private[flink] def pushFilterIntoScan( + call: RelOptRuleCall, + calc: Calc, + scan: TableSourceScan, + tableSourceTable: TableSourceTable[_], + filterableSource: FilterableTableSource[_], + description: String): Unit = { + +if (filterableSource.isFilterPushedDown) { + // The rule can get triggered again due to the transformed "scan => filter" + // sequence created by the earlier execution of this rule when we could not + // push all the conditions into the scan + return +} + +val program = calc.getProgram +val (predicates, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( +program, +call.builder().getRexBuilder, +tableSourceTable.tableEnv.getFunctionCatalog) +if (predicates.isEmpty) { + // no condition can be translated to expression + return +} + +val (newTableSource, remainingPredicates) = filterableSource.applyPredicate(predicates) +// trying to apply filter push down, set the flag to true no matter whether +// we actually push any filters down. +newTableSource.setFilterPushedDown(true) + +// check whether framework still need to do a filter +val relBuilder = call.builder() +val remainingCondition = { + if (remainingPredicates.nonEmpty || unconvertedRexNodes.nonEmpty) { +relBuilder.push(scan) +(remainingPredicates.map(expr => expr.toRexNode(relBuilder)) ++ unconvertedRexNodes) +.reduce((l, r) => relBuilder.and(l, r)) + } else { +null + } +} + +// check whether we still need a RexProgram. An RexProgram is needed when either +// projection or filter exists. +val newScan = scan.copy(scan.getTraitSet, newTableSource) +val newRexProgram = { + if (remainingCondition != null || program.getProjectList.size() > 0) { --- End diff -- Thanks for the tips, will change this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5883) Re-adding the Exception-thrown code for ListKeyGroupedIterator when the iterator is requested the second time
[ https://issues.apache.org/jira/browse/FLINK-5883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927325#comment-15927325 ] ASF GitHub Bot commented on FLINK-5883: --- Github user lincoln-lil commented on the issue: https://github.com/apache/flink/pull/3392 Appreciated if someone can merge this pr > Re-adding the Exception-thrown code for ListKeyGroupedIterator when the > iterator is requested the second time > - > > Key: FLINK-5883 > URL: https://issues.apache.org/jira/browse/FLINK-5883 > Project: Flink > Issue Type: Improvement > Components: DataSet API >Reporter: lincoln.lee >Assignee: lincoln.lee > > Originally, ListKeyGroupedIterator ensured that a TraversableOnceException > was thrown when the iterator is requested the second time within FLINK-1023, > it was lost from FLINK-1110 unexpectedly, so add it back. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3392: [FLINK-5883] Re-adding the Exception-thrown code for List...
Github user lincoln-lil commented on the issue: https://github.com/apache/flink/pull/3392 Appreciated if someone can merge this pr --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927322#comment-15927322 ] ASF GitHub Bot commented on FLINK-5658: --- Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106324953 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = +new java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] + + /** For store timeSectionsMap **/ + private var timeSectionsState: ListState[String] = _ + private var inputKeySerializer: TypeSerializer[Tuple] = _ + private var timeSerializer: TypeSerializer[TimeWindow] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + interMediateType.createSerializer(getRuntimeContext.getExecutionConfig) +timeSerializer = new TimeWindow.Serializer +val stateDescriptor: MapStateDescriptor[TimeWindow, Row] = + new MapStateDescriptor[TimeWindow, Row]("rowtimeoverstate",
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106324953 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = +new java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] + + /** For store timeSectionsMap **/ + private var timeSectionsState: ListState[String] = _ + private var inputKeySerializer: TypeSerializer[Tuple] = _ + private var timeSerializer: TypeSerializer[TimeWindow] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + interMediateType.createSerializer(getRuntimeContext.getExecutionConfig) +timeSerializer = new TimeWindow.Serializer +val stateDescriptor: MapStateDescriptor[TimeWindow, Row] = + new MapStateDescriptor[TimeWindow, Row]("rowtimeoverstate", timeSerializer, valueSerializer) +inputKeySerializer = keyType.createSerializer(getRuntimeContext.getExecutionConfig) +state = getRuntimeContext.getMapState[TimeWindow, Row](stateDescriptor) + } + + override def
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927319#comment-15927319 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106324883 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala --- @@ -58,16 +60,24 @@ class BatchTableSourceScan( ) } + override def copy(traitSet: RelTraitSet, newTableSource: TableSource[_]): TableSourceScan = { +new BatchTableSourceScan( + cluster, + traitSet, + getTable, + newTableSource.asInstanceOf[BatchTableSource[_]] +) + } + override def explainTerms(pw: RelWriter): RelWriter = { -super.explainTerms(pw) +val terms = super.explainTerms(pw) .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) +tableSource.explainTerms(terms) --- End diff -- will change this. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927318#comment-15927318 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106324868 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -39,4 +39,6 @@ trait TableSource[T] { /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */ def getReturnType: TypeInformation[T] + /** Describes the table source */ + def explainTerms(pw: RelWriter): RelWriter = pw --- End diff -- Make sense to me, will change this. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106324883 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala --- @@ -58,16 +60,24 @@ class BatchTableSourceScan( ) } + override def copy(traitSet: RelTraitSet, newTableSource: TableSource[_]): TableSourceScan = { +new BatchTableSourceScan( + cluster, + traitSet, + getTable, + newTableSource.asInstanceOf[BatchTableSource[_]] +) + } + override def explainTerms(pw: RelWriter): RelWriter = { -super.explainTerms(pw) +val terms = super.explainTerms(pw) .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) +tableSource.explainTerms(terms) --- End diff -- will change this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106324868 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -39,4 +39,6 @@ trait TableSource[T] { /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */ def getReturnType: TypeInformation[T] + /** Describes the table source */ + def explainTerms(pw: RelWriter): RelWriter = pw --- End diff -- Make sense to me, will change this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5668) passing taskmanager configuration through taskManagerEnv instead of file
[ https://issues.apache.org/jira/browse/FLINK-5668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927308#comment-15927308 ] Bill Liu commented on FLINK-5668: - it still doesn't solve my problem. I want to start Flink Job without HDFS(or any writable shared file system). > passing taskmanager configuration through taskManagerEnv instead of file > > > Key: FLINK-5668 > URL: https://issues.apache.org/jira/browse/FLINK-5668 > Project: Flink > Issue Type: Improvement > Components: YARN >Reporter: Bill Liu > Original Estimate: 48h > Remaining Estimate: 48h > > When create a Flink cluster on Yarn, JobManager depends on HDFS to share > taskmanager-conf.yaml with TaskManager. > It's better to share the taskmanager-conf.yaml on JobManager Web server > instead of HDFS, which could reduce the HDFS dependency at job startup. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager
[ https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927296#comment-15927296 ] ASF GitHub Bot commented on FLINK-4364: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3151 @tillrohrmann , just remind to review my modifications for your free time, because this would block my next pull request of heartbeat between `TaskManager` and `ResourceManager`. Thank you! > Implement TaskManager side of heartbeat from JobManager > --- > > Key: FLINK-4364 > URL: https://issues.apache.org/jira/browse/FLINK-4364 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: zhijiang >Assignee: zhijiang > > The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and > the {{TaskManager}} will report metrics info for each heartbeat. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5865) Throw original exception in states
[ https://issues.apache.org/jira/browse/FLINK-5865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927295#comment-15927295 ] ASF GitHub Bot commented on FLINK-5865: --- Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3380 I prefer to throw more detailed exceptions e.g. `IncompatibleTypeSerializerException`, `StateAccessException` and `StateNotFoundException`. They all are extended from `FlinkRuntimeException`. Users can get more information from these exceptions if they catch the exceptions. > Throw original exception in states > -- > > Key: FLINK-5865 > URL: https://issues.apache.org/jira/browse/FLINK-5865 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Now all exception thrown in RocksDB states are converted to > {{RuntimeException}}. It's unnecessary and will print useless stacks in the > log. > I think it's better to throw the original exception, without any wrapping. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3151: [FLINK-4364] [runtime] [FLIP-6] Implement TaskManager sid...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3151 @tillrohrmann , just remind to review my modifications for your free time, because this would block my next pull request of heartbeat between `TaskManager` and `ResourceManager`. Thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3380: [FLINK-5865][state] Throw original exception in the state...
Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3380 I prefer to throw more detailed exceptions e.g. `IncompatibleTypeSerializerException`, `StateAccessException` and `StateNotFoundException`. They all are extended from `FlinkRuntimeException`. Users can get more information from these exceptions if they catch the exceptions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106322568 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala --- @@ -42,63 +41,40 @@ class DataSetCalc( traitSet: RelTraitSet, input: RelNode, rowRelDataType: RelDataType, -private[flink] val calcProgram: RexProgram, // for tests +calcProgram: RexProgram, ruleDescription: String) - extends SingleRel(cluster, traitSet, input) + extends Calc(cluster, traitSet, input, calcProgram) --- End diff -- This is because i want to unify the PushFilterIntoScan rule's code for both batch and stream mode. During executing the rule, we may need to create a new copy of the DataSetCalc or DataStreamCalc. It make things more easier to let these two classes inherit from `Calc`, and use `Calc.copy` to create a new copied instance. I do encountered some problem after i changed the hierarchy, some unit tests failed because of the plan changed. But it's because we don't calculate the cost for Calc right. I added some logic to `CommanCalc.computeSelfCost`, and everything works fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927290#comment-15927290 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106322568 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala --- @@ -42,63 +41,40 @@ class DataSetCalc( traitSet: RelTraitSet, input: RelNode, rowRelDataType: RelDataType, -private[flink] val calcProgram: RexProgram, // for tests +calcProgram: RexProgram, ruleDescription: String) - extends SingleRel(cluster, traitSet, input) + extends Calc(cluster, traitSet, input, calcProgram) --- End diff -- This is because i want to unify the PushFilterIntoScan rule's code for both batch and stream mode. During executing the rule, we may need to create a new copy of the DataSetCalc or DataStreamCalc. It make things more easier to let these two classes inherit from `Calc`, and use `Calc.copy` to create a new copied instance. I do encountered some problem after i changed the hierarchy, some unit tests failed because of the plan changed. But it's because we don't calculate the cost for Calc right. I added some logic to `CommanCalc.computeSelfCost`, and everything works fine. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927277#comment-15927277 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106321885 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala --- @@ -24,7 +24,7 @@ package org.apache.flink.table.sources * * @tparam T The return type of the [[ProjectableTableSource]]. */ -trait ProjectableTableSource[T] { +trait ProjectableTableSource[T] extends TableSource[T] { --- End diff -- It's because i want to unify the PushProjectIntoScan rule codes for both batch and stream mode. And once we push down project into table source, we not only should create a new TableScan instance, but also a new TableSource instance. The codes are like: ``` val newTableSource = originTableSource.projectFields(usedFields) // create a new scan with the new TableSource instance val newScan = scan.copy(scan.getTraitSet, newTableSource) ``` At first the `projectFields` method returned `ProjectableTableSource` which is not a `TableSource`, so i let `ProjectableTableSource` inherit from `TableSource`. But i just noticed we can just let `projectFields` return one `TableSource`, and problem resolved. Will change this. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106321885 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala --- @@ -24,7 +24,7 @@ package org.apache.flink.table.sources * * @tparam T The return type of the [[ProjectableTableSource]]. */ -trait ProjectableTableSource[T] { +trait ProjectableTableSource[T] extends TableSource[T] { --- End diff -- It's because i want to unify the PushProjectIntoScan rule codes for both batch and stream mode. And once we push down project into table source, we not only should create a new TableScan instance, but also a new TableSource instance. The codes are like: ``` val newTableSource = originTableSource.projectFields(usedFields) // create a new scan with the new TableSource instance val newScan = scan.copy(scan.getTraitSet, newTableSource) ``` At first the `projectFields` method returned `ProjectableTableSource` which is not a `TableSource`, so i let `ProjectableTableSource` inherit from `TableSource`. But i just noticed we can just let `projectFields` return one `TableSource`, and problem resolved. Will change this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-6026) Return type of flatMap with lambda function not correctly resolved
[ https://issues.apache.org/jira/browse/FLINK-6026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Hutchison closed FLINK-6026. - Resolution: Not A Bug > Return type of flatMap with lambda function not correctly resolved > -- > > Key: FLINK-6026 > URL: https://issues.apache.org/jira/browse/FLINK-6026 > Project: Flink > Issue Type: Bug > Components: Core, DataSet API, DataStream API >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Minor > > I get an error if I try naming a flatMap operation: > {code} > DataSet> y = x.flatMap((t, out) -> > out.collect(t)).name("op"); > {code} > Type mismatch: cannot convert from > FlatMapOperator ,Object> to > DataSet > > If I try to do it as two steps, I get the error that DataSet does not have a > .name(String) method: > {code} > DataSet > y = x.flatMap((t, out) -> out.collect(t)); > y.name("op"); > {code} > If I use Eclipse type inference on x, it shows me that the output type is not > correctly inferred: > {code} > FlatMapOperator , Object> y = x.flatMap((t, out) -> > out.collect(t)); > y.name("op"); // This now works, but "Object" is not the output type > {code} > However, these steps still cannot be chained -- the following still gives an > error: > {code} > FlatMapOperator , Object> y = x.flatMap((t, out) -> > out.collect(t)).name("op"); > {code} > i.e. first you have to assign the result to a field, so that the type is > fully specified; then you can name the operation. > And the weird thing is that you can give the correct, more specific type for > the local variable, without a type narrowing error: > {code} > FlatMapOperator , Tuple2 > y = > x.flatMap((t, out) -> out.collect(t)); > y.name("op"); // This works, although chaining these two lines still does > not work > {code} > If the types of the lambda args are specified, then everything works: > {code} > DataSet > y = x.flatMap((Tuple2 t, > Collector > out) -> out.collect(t)).name("op"); > {code} > So, at least two things are going on here: > (1) type inference is not working correctly for the lambda parameters > (2) this breaks type inference for intermediate expressions, unless the type > can be resolved using a local variable definition > Is this a bug in the type signature of flatMap? (Or a compiler bug or > limitation, or a fundamental limitation of Java 8 type inference?) > It seems odd that the type of a local variable definition can make the result > of the flatMap operator *more* specific, taking the type from > {code} > FlatMapOperator , Object> > {code} > to > {code} > FlatMapOperator , Tuple2 > > {code} > i.e. if the output type is provided in the local variable definition, it is > properly unified with the type of the parameter t of collect(t), however that > type is not propagated out of that call. > Can anything be done about this in Flink? I have hit this problem a few times. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6026) Return type of flatMap with lambda function not correctly resolved
[ https://issues.apache.org/jira/browse/FLINK-6026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927065#comment-15927065 ] Luke Hutchison commented on FLINK-6026: --- Makes sense. I was wondering if there was some sort of type signature tweak that could be performed to make this work. I guess not -- thanks anyway! > Return type of flatMap with lambda function not correctly resolved > -- > > Key: FLINK-6026 > URL: https://issues.apache.org/jira/browse/FLINK-6026 > Project: Flink > Issue Type: Bug > Components: Core, DataSet API, DataStream API >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Minor > > I get an error if I try naming a flatMap operation: > {code} > DataSet> y = x.flatMap((t, out) -> > out.collect(t)).name("op"); > {code} > Type mismatch: cannot convert from > FlatMapOperator ,Object> to > DataSet > > If I try to do it as two steps, I get the error that DataSet does not have a > .name(String) method: > {code} > DataSet > y = x.flatMap((t, out) -> out.collect(t)); > y.name("op"); > {code} > If I use Eclipse type inference on x, it shows me that the output type is not > correctly inferred: > {code} > FlatMapOperator , Object> y = x.flatMap((t, out) -> > out.collect(t)); > y.name("op"); // This now works, but "Object" is not the output type > {code} > However, these steps still cannot be chained -- the following still gives an > error: > {code} > FlatMapOperator , Object> y = x.flatMap((t, out) -> > out.collect(t)).name("op"); > {code} > i.e. first you have to assign the result to a field, so that the type is > fully specified; then you can name the operation. > And the weird thing is that you can give the correct, more specific type for > the local variable, without a type narrowing error: > {code} > FlatMapOperator , Tuple2 > y = > x.flatMap((t, out) -> out.collect(t)); > y.name("op"); // This works, although chaining these two lines still does > not work > {code} > If the types of the lambda args are specified, then everything works: > {code} > DataSet > y = x.flatMap((Tuple2 t, > Collector > out) -> out.collect(t)).name("op"); > {code} > So, at least two things are going on here: > (1) type inference is not working correctly for the lambda parameters > (2) this breaks type inference for intermediate expressions, unless the type > can be resolved using a local variable definition > Is this a bug in the type signature of flatMap? (Or a compiler bug or > limitation, or a fundamental limitation of Java 8 type inference?) > It seems odd that the type of a local variable definition can make the result > of the flatMap operator *more* specific, taking the type from > {code} > FlatMapOperator , Object> > {code} > to > {code} > FlatMapOperator , Tuple2 > > {code} > i.e. if the output type is provided in the local variable definition, it is > properly unified with the type of the parameter t of collect(t), however that > type is not propagated out of that call. > Can anything be done about this in Flink? I have hit this problem a few times. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3414) Add Scala API for CEP's pattern definition
[ https://issues.apache.org/jira/browse/FLINK-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927063#comment-15927063 ] Ivan Mushketyk commented on FLINK-3414: --- Hi [~dawidwys] I don't have time to work on this task now, so I don't mind if you take over. I'll reassign it to you. > Add Scala API for CEP's pattern definition > -- > > Key: FLINK-3414 > URL: https://issues.apache.org/jira/browse/FLINK-3414 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Ivan Mushketyk >Priority: Minor > > Currently, the CEP library only supports a Java API to specify complex event > patterns. In order to make it a bit less verbose for Scala users, it would be > nice to also add a Scala API for the CEP library. > A Scala API would also allow to pass Scala's anonymous functions as filter > conditions or as a select function, for example, or to use partial functions > to distinguish between different events. > Furthermore, the Scala API could be designed to feel a bit more like a DSL: > {code} > begin "start" where _.id >= 42 -> "middle_1" as classOf[Subclass] || > "middle_2" where _.name equals "foobar" -> "end" where x => x.id <= x.volume > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-3414) Add Scala API for CEP's pattern definition
[ https://issues.apache.org/jira/browse/FLINK-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Mushketyk reassigned FLINK-3414: - Assignee: Dawid Wysakowicz (was: Ivan Mushketyk) > Add Scala API for CEP's pattern definition > -- > > Key: FLINK-3414 > URL: https://issues.apache.org/jira/browse/FLINK-3414 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Dawid Wysakowicz >Priority: Minor > > Currently, the CEP library only supports a Java API to specify complex event > patterns. In order to make it a bit less verbose for Scala users, it would be > nice to also add a Scala API for the CEP library. > A Scala API would also allow to pass Scala's anonymous functions as filter > conditions or as a select function, for example, or to use partial functions > to distinguish between different events. > Furthermore, the Scala API could be designed to feel a bit more like a DSL: > {code} > begin "start" where _.id >= 42 -> "middle_1" as classOf[Subclass] || > "middle_2" where _.name equals "foobar" -> "end" where x => x.id <= x.volume > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3523: [FLINK-5985] Report no task states for stateless tasks on...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3523 Can we slightly adapt the test to target more the typical use case: - Original job has some stateless ops (no uid), and some stateful ones (with uid) - Create a modified job that has the same stateful ones (same uids) but different stateless ones Otherwise this looks good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)
[ https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926831#comment-15926831 ] ASF GitHub Bot commented on FLINK-5985: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3523 Can we slightly adapt the test to target more the typical use case: - Original job has some stateless ops (no uid), and some stateful ones (with uid) - Create a modified job that has the same stateful ones (same uids) but different stateless ones Otherwise this looks good. > Flink treats every task as stateful (making topology changes impossible) > > > Key: FLINK-5985 > URL: https://issues.apache.org/jira/browse/FLINK-5985 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Gyula Fora >Priority: Critical > > It seems that Flink treats every Task as stateful so changing the topology > is not possible without setting uid on every single operator. > If the topology has an iteration this is virtually impossible (or at least > gets super hacky) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5048) Kafka Consumer (0.9/0.10) threading model leads problematic cancellation behavior
[ https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926820#comment-15926820 ] Robert Metzger commented on FLINK-5048: --- I'm not aware of any user on 1.1 affected by this. The change is quite involved and could potentially break existing code. Therefore, I would not backport that change. > Kafka Consumer (0.9/0.10) threading model leads problematic cancellation > behavior > - > > Key: FLINK-5048 > URL: https://issues.apache.org/jira/browse/FLINK-5048 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.3 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.2.0, 1.1.5 > > > The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that > operates the KafkaConsumer. That thread is shielded from interrupts, because > the Kafka Consumer has not been handling thread interrupts well. > Since that thread is also the thread that emits records, it may block in the > network stack (backpressure) or in chained operators. The later case leads to > situations where cancellations get very slow unless that thread would be > interrupted (which it cannot be). > I propose to change the thread model as follows: > - A spawned consumer thread pull from the KafkaConsumer and pushes its > pulled batch of records into a blocking queue (size one) > - The main thread of the task will pull the record batches from the > blocking queue and emit the records. > This allows actually for some additional I/O overlay while limiting the > additional memory consumption - only two batches are ever held, one being > fetched and one being emitted. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5962) Cancel checkpoint canceller tasks in CheckpointCoordinator
[ https://issues.apache.org/jira/browse/FLINK-5962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926812#comment-15926812 ] Stephan Ewen commented on FLINK-5962: - Fixed in 1.2 via 9d59e008d8849cdfe2daf302e251454435bb997f > Cancel checkpoint canceller tasks in CheckpointCoordinator > -- > > Key: FLINK-5962 > URL: https://issues.apache.org/jira/browse/FLINK-5962 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann >Assignee: Stephan Ewen >Priority: Critical > > The {{CheckpointCoordinator}} register a canceller task for each running > checkpoint. The canceller task's responsibility is to cancel a checkpoint if > it takes too long to complete. We should cancel this task as soon as the > checkpoint has been completed, because otherwise we will keep many canceller > tasks around. This can eventually lead to an OOM exception. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #:
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/commit/0a501e9f7f56baba2905002b74746998458db007#commitcomment-21337504 In flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java: In flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java on line 84: I think this results in unnecessary boxing of `time`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6044) TypeSerializerSerializationProxy.read() doesn't verify the read buffer length
[ https://issues.apache.org/jira/browse/FLINK-6044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926785#comment-15926785 ] Stephan Ewen commented on FLINK-6044: - The {{read()}} / {{readFully()}} issue has occurred before, it will most likely occur again. A simple thing we can do is "forbid" that method in the same way as String/Byte conversions without a charset. Take a look at the {{CheckForbiddenMethodsUsage}} test in the manual tests. We can add such as test for the {{read()}} method. We only need to remember executing the manual tests on every release. > TypeSerializerSerializationProxy.read() doesn't verify the read buffer length > - > > Key: FLINK-6044 > URL: https://issues.apache.org/jira/browse/FLINK-6044 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.2.0 > Environment: Ubuntu server 12.04.5 64 bit > java version "1.8.0_111" > Java(TM) SE Runtime Environment (build 1.8.0_111-b14) > Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode) >Reporter: Avihai Berkovitz >Assignee: Stefan Richter >Priority: Critical > Fix For: 1.3.0 > > > The read() method of TypeSerializerSerializationProxy creates a buffers and > tries to fill it by calling the read() method of the given DataInputView, but > never checks the return value. The actual size read from the stream might be > smaller than the buffer size, and the rest of the buffer is filled with > zeroes, causing the deserialization to fail. > It happened to me using a RocksDB state backend backed by S3. The setup was > done according to > https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/aws.html#s3-simple-storage-service > and everything worked correctly until I upgraded to Flink 1.2.0. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6044) TypeSerializerSerializationProxy.read() doesn't verify the read buffer length
[ https://issues.apache.org/jira/browse/FLINK-6044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926779#comment-15926779 ] Stephan Ewen commented on FLINK-6044: - [~srichter] I think we should try and safeguard Flink against running into the same bug again. > TypeSerializerSerializationProxy.read() doesn't verify the read buffer length > - > > Key: FLINK-6044 > URL: https://issues.apache.org/jira/browse/FLINK-6044 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.2.0 > Environment: Ubuntu server 12.04.5 64 bit > java version "1.8.0_111" > Java(TM) SE Runtime Environment (build 1.8.0_111-b14) > Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode) >Reporter: Avihai Berkovitz >Assignee: Stefan Richter >Priority: Critical > Fix For: 1.3.0 > > > The read() method of TypeSerializerSerializationProxy creates a buffers and > tries to fill it by calling the read() method of the given DataInputView, but > never checks the return value. The actual size read from the stream might be > smaller than the buffer size, and the rest of the buffer is filled with > zeroes, causing the deserialization to fail. > It happened to me using a RocksDB state backend backed by S3. The setup was > done according to > https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/aws.html#s3-simple-storage-service > and everything worked correctly until I upgraded to Flink 1.2.0. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6018) Properly initialise StateDescriptor in AbstractStateBackend.getPartitionedState()
[ https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926772#comment-15926772 ] ASF GitHub Bot commented on FLINK-6018: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3534 This change may alter the format of savepoints, because it now forwards type-registrations to the Kryo serializer, which it did not do before. Since we announced savepoint compatibility, we need to understand when this would happen. Is it something that could not really happen before (because whenever the method was called, the serializer was properly initialized outside), or did this actually occur in some cases? @sunjincheng121 Can you share how you stumbled across this bug? Was it a code cleanup, or a bug you encountered while running Flink? > Properly initialise StateDescriptor in > AbstractStateBackend.getPartitionedState() > - > > Key: FLINK-6018 > URL: https://issues.apache.org/jira/browse/FLINK-6018 > Project: Flink > Issue Type: Improvement > Components: DataStream API, State Backends, Checkpointing >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.3.0 > > > The code snippet currently in the `AbstractKeyedStateBackend # > getPartitionedState` method, as follows: > {code} > line 352: // TODO: This is wrong, it should throw an exception that the > initialization has not properly happened > line 353: if (!stateDescriptor.isSerializerInitialized()) { > line 354:stateDescriptor.initializeSerializerUnlessSet(new > ExecutionConfig()); > line 354 } > {code} > Method `isSerializerInitialized`: > {code} > public boolean isSerializerInitialized() { > return serializer != null; > } > {code} > Method `initializeSerializerUnlessSet`: > {code} > public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { > if (serializer == null) { > if (typeInfo != null) { > serializer = > typeInfo.createSerializer(executionConfig); > } else { > throw new IllegalStateException( > "Cannot initialize serializer > after TypeInformation was dropped during serialization"); > } > } > } > {code} > that is, in the `initializeSerializerUnlessSet` method, The `serializer` has > been checked by `serializer == null`.So I hope this code has a little > improvement to the following: > approach 1: > According to the `TODO` information we throw an exception > {code} > if (!stateDescriptor.isSerializerInitialized()) { > throw new IllegalStateException("The serializer of the > descriptor has not been initialized!"); > } > {code} > approach 2: > Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) > {` logic. > {code} > stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); > {code} > Meanwhile, If we use the approach 2, I suggest that > `AbstractKeyedStateBackend` add a `private final ExecutionConfig > executionConfig` property. then we can change the code like this: > {code} > stateDescriptor.initializeSerializerUnlessSet(executionConfig); > {code} > Are the above suggestions reasonable for you? > Welcome anybody's feedback and corrections. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3534: [FLINK-6018][statebackend] Properly initialise StateDescr...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3534 This change may alter the format of savepoints, because it now forwards type-registrations to the Kryo serializer, which it did not do before. Since we announced savepoint compatibility, we need to understand when this would happen. Is it something that could not really happen before (because whenever the method was called, the serializer was properly initialized outside), or did this actually occur in some cases? @sunjincheng121 Can you share how you stumbled across this bug? Was it a code cleanup, or a bug you encountered while running Flink? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6018) Properly initialise StateDescriptor in AbstractStateBackend.getPartitionedState()
[ https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926770#comment-15926770 ] ASF GitHub Bot commented on FLINK-6018: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3534 There is also another method in `AbstractKeyedStateBackend`: `getOrCreateKeyedState` which does not silently use an empty `ExecutionConfig` but throws an exception. That method could also use the same execution config, now that it is available. > Properly initialise StateDescriptor in > AbstractStateBackend.getPartitionedState() > - > > Key: FLINK-6018 > URL: https://issues.apache.org/jira/browse/FLINK-6018 > Project: Flink > Issue Type: Improvement > Components: DataStream API, State Backends, Checkpointing >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.3.0 > > > The code snippet currently in the `AbstractKeyedStateBackend # > getPartitionedState` method, as follows: > {code} > line 352: // TODO: This is wrong, it should throw an exception that the > initialization has not properly happened > line 353: if (!stateDescriptor.isSerializerInitialized()) { > line 354:stateDescriptor.initializeSerializerUnlessSet(new > ExecutionConfig()); > line 354 } > {code} > Method `isSerializerInitialized`: > {code} > public boolean isSerializerInitialized() { > return serializer != null; > } > {code} > Method `initializeSerializerUnlessSet`: > {code} > public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { > if (serializer == null) { > if (typeInfo != null) { > serializer = > typeInfo.createSerializer(executionConfig); > } else { > throw new IllegalStateException( > "Cannot initialize serializer > after TypeInformation was dropped during serialization"); > } > } > } > {code} > that is, in the `initializeSerializerUnlessSet` method, The `serializer` has > been checked by `serializer == null`.So I hope this code has a little > improvement to the following: > approach 1: > According to the `TODO` information we throw an exception > {code} > if (!stateDescriptor.isSerializerInitialized()) { > throw new IllegalStateException("The serializer of the > descriptor has not been initialized!"); > } > {code} > approach 2: > Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) > {` logic. > {code} > stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); > {code} > Meanwhile, If we use the approach 2, I suggest that > `AbstractKeyedStateBackend` add a `private final ExecutionConfig > executionConfig` property. then we can change the code like this: > {code} > stateDescriptor.initializeSerializerUnlessSet(executionConfig); > {code} > Are the above suggestions reasonable for you? > Welcome anybody's feedback and corrections. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3534: [FLINK-6018][statebackend] Properly initialise StateDescr...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3534 There is also another method in `AbstractKeyedStateBackend`: `getOrCreateKeyedState` which does not silently use an empty `ExecutionConfig` but throws an exception. That method could also use the same execution config, now that it is available. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6018) Properly initialise StateDescriptor in AbstractStateBackend.getPartitionedState()
[ https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926768#comment-15926768 ] ASF GitHub Bot commented on FLINK-6018: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3534 I think this could have really used a test. It is tricky bug that will easily be re-introduced without a test that guards the fix. > Properly initialise StateDescriptor in > AbstractStateBackend.getPartitionedState() > - > > Key: FLINK-6018 > URL: https://issues.apache.org/jira/browse/FLINK-6018 > Project: Flink > Issue Type: Improvement > Components: DataStream API, State Backends, Checkpointing >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.3.0 > > > The code snippet currently in the `AbstractKeyedStateBackend # > getPartitionedState` method, as follows: > {code} > line 352: // TODO: This is wrong, it should throw an exception that the > initialization has not properly happened > line 353: if (!stateDescriptor.isSerializerInitialized()) { > line 354:stateDescriptor.initializeSerializerUnlessSet(new > ExecutionConfig()); > line 354 } > {code} > Method `isSerializerInitialized`: > {code} > public boolean isSerializerInitialized() { > return serializer != null; > } > {code} > Method `initializeSerializerUnlessSet`: > {code} > public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { > if (serializer == null) { > if (typeInfo != null) { > serializer = > typeInfo.createSerializer(executionConfig); > } else { > throw new IllegalStateException( > "Cannot initialize serializer > after TypeInformation was dropped during serialization"); > } > } > } > {code} > that is, in the `initializeSerializerUnlessSet` method, The `serializer` has > been checked by `serializer == null`.So I hope this code has a little > improvement to the following: > approach 1: > According to the `TODO` information we throw an exception > {code} > if (!stateDescriptor.isSerializerInitialized()) { > throw new IllegalStateException("The serializer of the > descriptor has not been initialized!"); > } > {code} > approach 2: > Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) > {` logic. > {code} > stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); > {code} > Meanwhile, If we use the approach 2, I suggest that > `AbstractKeyedStateBackend` add a `private final ExecutionConfig > executionConfig` property. then we can change the code like this: > {code} > stateDescriptor.initializeSerializerUnlessSet(executionConfig); > {code} > Are the above suggestions reasonable for you? > Welcome anybody's feedback and corrections. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3534: [FLINK-6018][statebackend] Properly initialise StateDescr...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3534 I think this could have really used a test. It is tricky bug that will easily be re-introduced without a test that guards the fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6063) HA Configuration doesn't work with Flink 1.2
[ https://issues.apache.org/jira/browse/FLINK-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926682#comment-15926682 ] Till Rohrmann commented on FLINK-6063: -- Hi Razvan, sometimes it can take a little while until ZooKeeper notifies the TaskManagers about the new leader. In the meantime it tries to reconnect to the old master. But as soon as the new leader information is written to ZooKeeper and sent to the TaskManagers they should try to connect to the new leader. How long have you tried it out? Maybe it would be helpful to also see the JobManager log which became the new leader and more of the TaskManager logs. > HA Configuration doesn't work with Flink 1.2 > > > Key: FLINK-6063 > URL: https://issues.apache.org/jira/browse/FLINK-6063 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.2.0 >Reporter: Razvan >Priority: Critical > > I have a setup with flink 1.2 cluster, made up of 3 JobManagers and 2 > TaskManagers. I start the Zookeeper Quorum from JobManager1, I get > confirmation Zookeeper starts on the other 2 JobManagers then I start a Flink > job on this JobManager1. > > The flink-conf.yaml is the same on all 5 VMs (also everything else related > to flink because I copied the folder across all VMs as suggested in > tutorials) this means jobmanager.rpc.address: points to JobManager1 > everywhere. > If I turn off the VM running JobManager1 I would expect Zookeeper to say one > of the remaining JobManagers is the leader and the TaskManagers should > reconnect to it. Instead a new leader is elected but the slaves keep > connecting to the old master > 2017-03-15 10:28:28,655 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async > calls on Source: Custom Source -> Flat Map (1/1) > 2017-03-15 10:28:38,534 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Disassociated] > 2017-03-15 10:28:46,606 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:28:52,431 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:02,435 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] > ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused > by: [Connection refused: /1.2.3.4:44779] > 2017-03-15 10:29:10,489 INFO > org.apache.flink.runtime.taskmanager.TaskManager - TaskManager > akka://flink/user/taskmanager disconnects from JobManager > akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its > leadership. > 2017-03-15 10:29:10,490 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Cancelling > all computations and discarding all cached data. > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Source: Custom Source > -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223). > 2017-03-15 10:29:10,491 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Flat Map (1/1) > (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED. > java.lang.Exception: TaskManager akka://flink/user/taskmanager > disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: > Old JobManager lost its leadership. > at > org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) >
[jira] [Commented] (FLINK-5095) Add explicit notifyOfAddedX methods to MetricReporter interface
[ https://issues.apache.org/jira/browse/FLINK-5095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926684#comment-15926684 ] Bowen Li commented on FLINK-5095: - Agree that MetricReporter interface is not extendable and scalable, and will definitely causing problems while Flink's metrics and monitoring systems grow. I'll take some time to think about this as well as https://issues.apache.org/jira/browse/FLINK-6053 > Add explicit notifyOfAddedX methods to MetricReporter interface > --- > > Key: FLINK-5095 > URL: https://issues.apache.org/jira/browse/FLINK-5095 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.3 >Reporter: Chesnay Schepler >Priority: Minor > > I would like to start a discussion on the MetricReporter interface, > specifically the methods that notify a reporter of added or removed metrics. > Currently, the methods are defined as follows: > {code} > void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group); > void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup > group); > {code} > All metrics, regardless of their actual type, are passed to the reporter with > these methods. > Since the different metric types have to be handled differently we thus force > every reporter to do something like this: > {code} > if (metric instanceof Counter) { > Counter c = (Counter) metric; > // deal with counter > } else if (metric instanceof Gauge) { > // deal with gauge > } else if (metric instanceof Histogram) { > // deal with histogram > } else if (metric instanceof Meter) { > // deal with meter > } else { > // log something or throw an exception > } > {code} > This has a few issues > * the instanceof checks and castings are unnecessary overhead > * it requires the implementer to be aware of every metric type > * it encourages throwing an exception in the final else block > We could remedy all of these by reworking the interface to contain explicit > add/remove methods for every metric type. This would however be a breaking > change and blow up the interface to 12 methods from the current 4. We could > also add a RichMetricReporter interface with these methods, which would > require relatively little changes but add additional complexity. > I was wondering what other people think about this. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6053) Gauge should only take subclasses of Number, rather than everything
[ https://issues.apache.org/jira/browse/FLINK-6053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926665#comment-15926665 ] Bowen Li commented on FLINK-6053: - Cool. Adding a NumberGauge or NumericGauge was my first impression when running into this problem. But not being able to extend AbstractReporter might not be good. I need more time to think through solution. And I appreciate your input! BTW, can you please add me as contributor to this project, so I can assign tickets to myself? > Gauge should only take subclasses of Number, rather than everything > -- > > Key: FLINK-6053 > URL: https://issues.apache.org/jira/browse/FLINK-6053 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.2.0 >Reporter: Bowen Li > Fix For: 2.0.0 > > > Currently, Flink's Gauge is defined as > ```java > public interface Gauge extends Metric { > T getValue(); > } > ``` > But it doesn't make sense to have Gauge take generic types other than Number. > And it blocks I from finishing FLINK-6013, because I cannot assume Gauge is > only about Number. So the class should be like > ```java > public interface Gauge extends Metric { > T getValue(); > } > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor
[ https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926662#comment-15926662 ] Stephan Ewen commented on FLINK-5756: - Just validated that compactions actually help, but compactions are equally slow when many values are merged. Its also the case that multiple gets to the same key take long, not only the first get. > When there are many values under the same key in ListState, > RocksDBStateBackend performances poor > - > > Key: FLINK-5756 > URL: https://issues.apache.org/jira/browse/FLINK-5756 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 > Environment: CentOS 7.2 >Reporter: Syinchwun Leo > > When using RocksDB as the StateBackend, if there are many values under the > same key in ListState, the windowState.get() operator performances very poor. > I also the the RocksDB using version 4.11.2, the performance is also very > poor. The problem is likely to related to RocksDB itself's get() operator > after using merge(). The problem may influences the window operation's > performance when the size is very large using ListState. I try to merge 5 > values under the same key in RocksDB, It costs 120 seconds to execute get() > operation. > /// > The flink's code is as follows: > {code} > class SEventSource extends RichSourceFunction [SEvent] { > private var count = 0L > private val alphabet = > "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" > override def run(sourceContext: SourceContext[SEvent]): Unit = { > while (true) { > for (i <- 0 until 5000) { > sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) > count += 1L > } > Thread.sleep(1000) > } > } > } > env.addSource(new SEventSource) > .assignTimestampsAndWatermarks(new > AssignerWithPeriodicWatermarks[SEvent] { > override def getCurrentWatermark: Watermark = { > new Watermark(System.currentTimeMillis()) > } > override def extractTimestamp(t: SEvent, l: Long): Long = { > System.currentTimeMillis() > } > }) > .keyBy(0) > .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2))) > .apply(new WindowStatistic) > .map(x => (System.currentTimeMillis(), x)) > .print() > {code} > > The RocksDB Test code: > {code} > val stringAppendOperator = new StringAppendOperator > val options = new Options() > options.setCompactionStyle(CompactionStyle.LEVEL) > .setCompressionType(CompressionType.SNAPPY_COMPRESSION) > .setLevelCompactionDynamicLevelBytes(true) > .setIncreaseParallelism(4) > .setUseFsync(true) > .setMaxOpenFiles(-1) > .setCreateIfMissing(true) > .setMergeOperator(stringAppendOperator) > val write_options = new WriteOptions > write_options.setSync(false) > val rocksDB = RocksDB.open(options, "/**/Data/") > val key = "key" > val value = > "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321" > val beginmerge = System.currentTimeMillis() > for(i <- 0 to 5) { > rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes()) > //rocksDB.put(key.getBytes, value.getBytes) > } > println("finish") > val begin = System.currentTimeMillis() > rocksDB.get(key.getBytes) > val end = System.currentTimeMillis() > println("merge cost:" + (begin - beginmerge)) > println("Time consuming:" + (end - begin)) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3483: [FLINK-5979] Backwards compatibility for HeapKeyed...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3483#discussion_r106138741 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; +import org.junit.Test; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.net.URL; +import java.util.Collections; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +public class HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest { + + /** +* This test takes a snapshot that was created with Flink 1.2 and tries to restore it in Flink 1.3 to check +* the backwards compatibility of the serialization format of {@link StateTable}s. +*/ + @Test + public void testHeapKeyedStateBackend1_2To1_3BackwardsCompatibility() throws Exception { + + ClassLoader cl = getClass().getClassLoader(); + URL resource = cl.getResource("heap_keyed_statebackend_1_2.snapshot"); --- End diff -- Please also add the code (commented out) that is used to re-generate this snapshot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3483: [FLINK-5979] Backwards compatibility for HeapKeyed...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3483#discussion_r106137929 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java --- @@ -0,0 +1,1021 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.TreeSet; + +/** + * Basis for Flink's in-memory state tables with copy-on-write support. This map does not support null values for --- End diff -- Is this the basis or the actual implementation? ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3483: [FLINK-5979] Backwards compatibility for HeapKeyed...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3483#discussion_r106132349 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyContext.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.KeyGroupRange; + +/** + * This interface is the current context of a keyed state. It provides information about the currently selected key in + * the context, the corresponding key-group, and other key and key-grouping related information. + * + * The typical use case for this interface is providing a view on the current-key selection aspects of + * {@link org.apache.flink.runtime.state.KeyedStateBackend}. + */ +public interface KeyContext { --- End diff -- We had a quick offline discussion about this because there already exists a `KeyContext` in Flink. Maybe you can rename this one to `InternalKeyContext` for now because it serves somewhat different purposes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106238200 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -19,33 +19,124 @@ package org.apache.flink.table.runtime.aggregate import java.util -import org.apache.calcite.rel.`type`._ +import scala.collection.JavaConversions.asScalaBuffer +import scala.collection.mutable.ArrayBuffer + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rex.RexInputRef +import org.apache.calcite.rex.RexLiteral +import org.apache.calcite.rex.RexWindowBound +import org.apache.calcite.sql.SqlAggFunction +import org.apache.calcite.sql.SqlKind import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.calcite.sql.`type`.SqlTypeName._ -import org.apache.calcite.sql.fun._ -import org.apache.calcite.sql.{SqlAggFunction, SqlKind} -import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT +import org.apache.calcite.sql.`type`.SqlTypeName.BOOLEAN +import org.apache.calcite.sql.`type`.SqlTypeName.DECIMAL +import org.apache.calcite.sql.`type`.SqlTypeName.DOUBLE +import org.apache.calcite.sql.`type`.SqlTypeName.FLOAT +import org.apache.calcite.sql.`type`.SqlTypeName.INTEGER +import org.apache.calcite.sql.`type`.SqlTypeName.SMALLINT +import org.apache.calcite.sql.`type`.SqlTypeName.TINYINT --- End diff -- after an inspection, I realized that the imports you mentioned are used. I think there is no unused import at this moment. Am I missing something? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926658#comment-15926658 ] ASF GitHub Bot commented on FLINK-5653: --- Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106238200 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -19,33 +19,124 @@ package org.apache.flink.table.runtime.aggregate import java.util -import org.apache.calcite.rel.`type`._ +import scala.collection.JavaConversions.asScalaBuffer +import scala.collection.mutable.ArrayBuffer + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rex.RexInputRef +import org.apache.calcite.rex.RexLiteral +import org.apache.calcite.rex.RexWindowBound +import org.apache.calcite.sql.SqlAggFunction +import org.apache.calcite.sql.SqlKind import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.calcite.sql.`type`.SqlTypeName._ -import org.apache.calcite.sql.fun._ -import org.apache.calcite.sql.{SqlAggFunction, SqlKind} -import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT +import org.apache.calcite.sql.`type`.SqlTypeName.BOOLEAN +import org.apache.calcite.sql.`type`.SqlTypeName.DECIMAL +import org.apache.calcite.sql.`type`.SqlTypeName.DOUBLE +import org.apache.calcite.sql.`type`.SqlTypeName.FLOAT +import org.apache.calcite.sql.`type`.SqlTypeName.INTEGER +import org.apache.calcite.sql.`type`.SqlTypeName.SMALLINT +import org.apache.calcite.sql.`type`.SqlTypeName.TINYINT --- End diff -- after an inspection, I realized that the imports you mentioned are used. I think there is no unused import at this moment. Am I missing something? > Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL > > > Key: FLINK-5653 > URL: https://issues.apache.org/jira/browse/FLINK-5653 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Stefano Bortoli > > The goal of this issue is to add support for OVER ROWS aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING > AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING > AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5656) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6065) Make TransportClient for ES5 pluggable
Robert Metzger created FLINK-6065: - Summary: Make TransportClient for ES5 pluggable Key: FLINK-6065 URL: https://issues.apache.org/jira/browse/FLINK-6065 Project: Flink Issue Type: Improvement Components: ElasticSearch Connector, Streaming Connectors Reporter: Robert Metzger This JIRA is based on a user request: http://stackoverflow.com/questions/42807454/flink-xpack-elasticsearch-5-elasticsearchsecurityexception-missing-autentication?noredirect=1#comment72728053_42807454 Currently, in the {{Elasticsearch5ApiCallBridge}} the {{PreBuiltTransportClient}} is hardcoded. It would be nice to make this client pluggable to allow using other clients such as the {{PreBuiltXPackTransportClient}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6018) Properly initialise StateDescriptor in AbstractStateBackend.getPartitionedState()
[ https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-6018. --- Resolution: Fixed Fix Version/s: 1.3.0 Fixed on master in 264f6df8e0c0fb2f9dfb0cd9beab9d380dc8e00c > Properly initialise StateDescriptor in > AbstractStateBackend.getPartitionedState() > - > > Key: FLINK-6018 > URL: https://issues.apache.org/jira/browse/FLINK-6018 > Project: Flink > Issue Type: Improvement > Components: DataStream API, State Backends, Checkpointing >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.3.0 > > > The code snippet currently in the `AbstractKeyedStateBackend # > getPartitionedState` method, as follows: > {code} > line 352: // TODO: This is wrong, it should throw an exception that the > initialization has not properly happened > line 353: if (!stateDescriptor.isSerializerInitialized()) { > line 354:stateDescriptor.initializeSerializerUnlessSet(new > ExecutionConfig()); > line 354 } > {code} > Method `isSerializerInitialized`: > {code} > public boolean isSerializerInitialized() { > return serializer != null; > } > {code} > Method `initializeSerializerUnlessSet`: > {code} > public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { > if (serializer == null) { > if (typeInfo != null) { > serializer = > typeInfo.createSerializer(executionConfig); > } else { > throw new IllegalStateException( > "Cannot initialize serializer > after TypeInformation was dropped during serialization"); > } > } > } > {code} > that is, in the `initializeSerializerUnlessSet` method, The `serializer` has > been checked by `serializer == null`.So I hope this code has a little > improvement to the following: > approach 1: > According to the `TODO` information we throw an exception > {code} > if (!stateDescriptor.isSerializerInitialized()) { > throw new IllegalStateException("The serializer of the > descriptor has not been initialized!"); > } > {code} > approach 2: > Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) > {` logic. > {code} > stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); > {code} > Meanwhile, If we use the approach 2, I suggest that > `AbstractKeyedStateBackend` add a `private final ExecutionConfig > executionConfig` property. then we can change the code like this: > {code} > stateDescriptor.initializeSerializerUnlessSet(executionConfig); > {code} > Are the above suggestions reasonable for you? > Welcome anybody's feedback and corrections. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6018) Properly initialise StateDescriptor in AbstractStateBackend.getPartitionedState()
[ https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926635#comment-15926635 ] ASF GitHub Bot commented on FLINK-6018: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3534 Thanks for your contribution! I just merged. Could you please close this PR? > Properly initialise StateDescriptor in > AbstractStateBackend.getPartitionedState() > - > > Key: FLINK-6018 > URL: https://issues.apache.org/jira/browse/FLINK-6018 > Project: Flink > Issue Type: Improvement > Components: DataStream API, State Backends, Checkpointing >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.3.0 > > > The code snippet currently in the `AbstractKeyedStateBackend # > getPartitionedState` method, as follows: > {code} > line 352: // TODO: This is wrong, it should throw an exception that the > initialization has not properly happened > line 353: if (!stateDescriptor.isSerializerInitialized()) { > line 354:stateDescriptor.initializeSerializerUnlessSet(new > ExecutionConfig()); > line 354 } > {code} > Method `isSerializerInitialized`: > {code} > public boolean isSerializerInitialized() { > return serializer != null; > } > {code} > Method `initializeSerializerUnlessSet`: > {code} > public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { > if (serializer == null) { > if (typeInfo != null) { > serializer = > typeInfo.createSerializer(executionConfig); > } else { > throw new IllegalStateException( > "Cannot initialize serializer > after TypeInformation was dropped during serialization"); > } > } > } > {code} > that is, in the `initializeSerializerUnlessSet` method, The `serializer` has > been checked by `serializer == null`.So I hope this code has a little > improvement to the following: > approach 1: > According to the `TODO` information we throw an exception > {code} > if (!stateDescriptor.isSerializerInitialized()) { > throw new IllegalStateException("The serializer of the > descriptor has not been initialized!"); > } > {code} > approach 2: > Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) > {` logic. > {code} > stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); > {code} > Meanwhile, If we use the approach 2, I suggest that > `AbstractKeyedStateBackend` add a `private final ExecutionConfig > executionConfig` property. then we can change the code like this: > {code} > stateDescriptor.initializeSerializerUnlessSet(executionConfig); > {code} > Are the above suggestions reasonable for you? > Welcome anybody's feedback and corrections. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3534: [FLINK-6018][statebackend] Properly initialise StateDescr...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3534 Thanks for your contribution! ð I just merged. Could you please close this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3483: [FLINK-5979] Backwards compatibility for HeapKeyed...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3483#discussion_r106136980 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; +import org.junit.Test; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.net.URL; +import java.util.Collections; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +public class HeapKeyedStateBackendSnapshotBackwardsCompatibilityTest { --- End diff -- All other tests for backwards compatibility end in `*MigrationTest`, we should probably stick to this naming pattern. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3483: [FLINK-5979] Backwards compatibility for HeapKeyed...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3483#discussion_r106133243 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java --- @@ -47,77 +44,62 @@ /** * Creates a new key/value state for the given hash map of key/value pairs. * -* @param backend The state backend backing that created this state. * @param stateDesc The state identifier for the state. This contains name * and can create a default state value. * @param stateTable The state tab;e to use in this kev/value state. May contain initial state. */ protected AbstractHeapMergingState( - KeyedStateBackend backend, SD stateDesc, StateTablestateTable, TypeSerializer keySerializer, TypeSerializer namespaceSerializer) { - super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer); + super(stateDesc, stateTable, keySerializer, namespaceSerializer); + this.mergeTransformation = new MergeTransformation(); } + private final MergeTransformation mergeTransformation; --- End diff -- Curious placement of field. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3483: [FLINK-5979] Backwards compatibility for HeapKeyed...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3483#discussion_r106139194 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateEntry.java --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import java.util.Objects; + +/** + * One full entry in a state table. Consists of an immutable key (not null), an immutable namespace (not null), and + * a state that can be mutable and null. + * + * @param type of key + * @param type of namespace + * @param type of value + */ +public class StateEntry{ --- End diff -- Is this used anywhere other than as a base class for `StateTableEntry` in `CopyOnWriteStateTable`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---