[jira] [Commented] (FLINK-7386) Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client
[ https://issues.apache.org/jira/browse/FLINK-7386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167365#comment-16167365 ] ASF GitHub Bot commented on FLINK-7386: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4675 @rmetzger @tzulitai I create this PR and try to fix [https://issues.apache.org/jira/browse/FLINK-7386(https://issues.apache.org/jira/browse/FLINK-7386). In the PR I do some things as follows: 1. Add `createRequestIndex` method in `ElasticsearchApiCallBridge`, then we can use this method to create different `RequestIndexer` instance. 2. Add flink-connector-elasticsearch5.3 project and create `BulkProcessorIndexer`. `ActionRequest` will be converted to `DocWriteRequest` in the `BulkProcessorIndexer` in the project. What do you think? Please have a look when you're free, thanks > Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ > client > > > Key: FLINK-7386 > URL: https://issues.apache.org/jira/browse/FLINK-7386 > Project: Flink > Issue Type: Improvement > Components: ElasticSearch Connector >Reporter: Dawid Wysakowicz >Assignee: Fang Yong > > In Elasticsearch 5.2.0 client the class {{BulkProcessor}} was refactored and > has no longer the method {{add(ActionRequest)}}. > For more info see: https://github.com/elastic/elasticsearch/pull/20109 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not compati...
Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4675 @rmetzger @tzulitai I create this PR and try to fix [https://issues.apache.org/jira/browse/FLINK-7386(https://issues.apache.org/jira/browse/FLINK-7386). In the PR I do some things as follows: 1. Add `createRequestIndex` method in `ElasticsearchApiCallBridge`, then we can use this method to create different `RequestIndexer` instance. 2. Add flink-connector-elasticsearch5.3 project and create `BulkProcessorIndexer`. `ActionRequest` will be converted to `DocWriteRequest` in the `BulkProcessorIndexer` in the project. What do you think? Please have a look when you're free, thanks ---
[jira] [Commented] (FLINK-7386) Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client
[ https://issues.apache.org/jira/browse/FLINK-7386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167357#comment-16167357 ] ASF GitHub Bot commented on FLINK-7386: --- GitHub user zjureel opened a pull request: https://github.com/apache/flink/pull/4675 [FLINK-7386] FIx Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client ## What is the purpose of the change Add flink-connector-elasticsearch5.3 to support Elasticsearch 5.3 and later version ## Brief change log - *Add createRequestIndex method in ElasticsearchApiCallBridge* - *Add flink-connector-elasticsearch5.3 project* - *Add BulkProcessorIndexer in connector-elasticsearch5.3 to convert ActionRequest to DocWriteRequest* ## Verifying this change This change added tests and can be verified as follows: - *Add ElasticsearchSinkITCase test case* - *Add ELasticsearchSinkExample in connector-elasticsearch5.3 to send request to Elasticsearch 5.3 and later versions* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-7386 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4675.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4675 commit 9dd30ccb5f5bd9940c8f1cfea4ffeb256d564862 Author: zjureelDate: 2017-09-15T03:51:35Z [FLINK-7386] Change ElasticsearchApiCallBridge to abstract class and add createRequestIndex method commit 5936bc7734557f75dc4b0c06cfc31b0b0e49a91a Author: zjureel Date: 2017-09-15T03:55:16Z [FLINK-7386] add flink-connector-elasticsearch5.3 for elasticsearch5.3 and later versions commit dbc87bb1dd361a2e840bd382814a780aa96a45c2 Author: zjureel Date: 2017-09-15T04:42:44Z [FLINK-7386] add test case for ES53 commit 74972ab9798e4e9173ad4cc4a6ec6bdf1390f98a Author: zjureel Date: 2017-09-15T05:33:43Z [FLINK-7386] add document for ES5.3 > Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ > client > > > Key: FLINK-7386 > URL: https://issues.apache.org/jira/browse/FLINK-7386 > Project: Flink > Issue Type: Improvement > Components: ElasticSearch Connector >Reporter: Dawid Wysakowicz >Assignee: Fang Yong > > In Elasticsearch 5.2.0 client the class {{BulkProcessor}} was refactored and > has no longer the method {{add(ActionRequest)}}. > For more info see: https://github.com/elastic/elasticsearch/pull/20109 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not ...
GitHub user zjureel opened a pull request: https://github.com/apache/flink/pull/4675 [FLINK-7386] FIx Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client ## What is the purpose of the change Add flink-connector-elasticsearch5.3 to support Elasticsearch 5.3 and later version ## Brief change log - *Add createRequestIndex method in ElasticsearchApiCallBridge* - *Add flink-connector-elasticsearch5.3 project* - *Add BulkProcessorIndexer in connector-elasticsearch5.3 to convert ActionRequest to DocWriteRequest* ## Verifying this change This change added tests and can be verified as follows: - *Add ElasticsearchSinkITCase test case* - *Add ELasticsearchSinkExample in connector-elasticsearch5.3 to send request to Elasticsearch 5.3 and later versions* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-7386 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4675.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4675 commit 9dd30ccb5f5bd9940c8f1cfea4ffeb256d564862 Author: zjureelDate: 2017-09-15T03:51:35Z [FLINK-7386] Change ElasticsearchApiCallBridge to abstract class and add createRequestIndex method commit 5936bc7734557f75dc4b0c06cfc31b0b0e49a91a Author: zjureel Date: 2017-09-15T03:55:16Z [FLINK-7386] add flink-connector-elasticsearch5.3 for elasticsearch5.3 and later versions commit dbc87bb1dd361a2e840bd382814a780aa96a45c2 Author: zjureel Date: 2017-09-15T04:42:44Z [FLINK-7386] add test case for ES53 commit 74972ab9798e4e9173ad4cc4a6ec6bdf1390f98a Author: zjureel Date: 2017-09-15T05:33:43Z [FLINK-7386] add document for ES5.3 ---
[jira] [Commented] (FLINK-7627) SingleElementIterable should implement with Serializable
[ https://issues.apache.org/jira/browse/FLINK-7627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167317#comment-16167317 ] ASF GitHub Bot commented on FLINK-7627: --- GitHub user hequn8128 opened a pull request: https://github.com/apache/flink/pull/4674 [FLINK-7627] [table] SingleElementIterable should implement with Serializable ## What is the purpose of the change *This pull request is a bugfix which implements `SingleElementIterable` with `Serializable`* ## Brief change log - *Implement SingleElementIterable with Serializable* ## Verifying this change This change added tests and can be verified as follows: - *Added test that validates that `SingleElementIterable` can be serialized, otherwise exceptions will be throwed* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no ) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/hequn8128/flink 7627 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4674.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4674 commit b1af2d612265f0727048388fea7695f42ba897cd Author: 军长Date: 2017-09-15T03:10:49Z [FLINK-7627] [table] SingleElementIterable should implement with Serializable > SingleElementIterable should implement with Serializable > - > > Key: FLINK-7627 > URL: https://issues.apache.org/jira/browse/FLINK-7627 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng > > {{SingleElementIterable}} is used to merge accumulators and it should be > serializable considering that it will be serialized when doing checkpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4674: [FLINK-7627] [table] SingleElementIterable should ...
GitHub user hequn8128 opened a pull request: https://github.com/apache/flink/pull/4674 [FLINK-7627] [table] SingleElementIterable should implement with Serializable ## What is the purpose of the change *This pull request is a bugfix which implements `SingleElementIterable` with `Serializable`* ## Brief change log - *Implement SingleElementIterable with Serializable* ## Verifying this change This change added tests and can be verified as follows: - *Added test that validates that `SingleElementIterable` can be serialized, otherwise exceptions will be throwed* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no ) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/hequn8128/flink 7627 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4674.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4674 commit b1af2d612265f0727048388fea7695f42ba897cd Author: åé¿Date: 2017-09-15T03:10:49Z [FLINK-7627] [table] SingleElementIterable should implement with Serializable ---
[jira] [Created] (FLINK-7627) SingleElementIterable should implement with Serializable
Hequn Cheng created FLINK-7627: -- Summary: SingleElementIterable should implement with Serializable Key: FLINK-7627 URL: https://issues.apache.org/jira/browse/FLINK-7627 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Hequn Cheng Assignee: Hequn Cheng {{SingleElementIterable}} is used to merge accumulators and it should be serializable considering that it will be serialized when doing checkpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-5858) Support multiple sinks in same execution DAG
[ https://issues.apache.org/jira/browse/FLINK-5858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he reassigned FLINK-5858: - Assignee: godfrey he > Support multiple sinks in same execution DAG > > > Key: FLINK-5858 > URL: https://issues.apache.org/jira/browse/FLINK-5858 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > When call writeToSink method to write the Table(with TableSource) to a > TableSink, the Table was translated to DataSet or DataStream, if we call > writeToSink(write to different sinks) more than once, the Table was also > translated more than once. The final execution graph was parted to different > DAGs. For example: > {code:title=Example.scala|borderStyle=solid} > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val csvTableSource = new CsvTableSource( > "/tmp/words", > Array("first", "id", "score", "last"), > Array( > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.DOUBLE_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO > ), > fieldDelim = "#" > ) > tEnv.registerTableSource("csv_source", csvTableSource) > val resultTable = tEnv.scan("csv_source") > .groupBy('first) > .select('first, 'score.sum) > resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1")) > resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2")) > println(tEnv.explain(resultTable)) > {code} > Results: > == Abstract Syntax Tree == > LogicalProject(first=[$0], TMP_1=[$1]) > LogicalAggregate(group=[{0}], TMP_0=[SUM($1)]) > LogicalProject(first=[$0], score=[$2]) > LogicalTableScan(table=[[csv_source]]) > == Optimized Logical Plan == > DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0]) > BatchTableSourceScan(table=[[csv_source]], fields=[first, score]) > == Physical Execution Plan == > {color:red} > Stage 6 : Data Source > {color} > content : collect elements with CollectionInputFormat > Partitioning : RANDOM_PARTITIONED > Stage 5 : Map > content : prepare select: (first, SUM(score) AS TMP_0) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Map > Partitioning : RANDOM_PARTITIONED > Stage 4 : GroupCombine > content : groupBy: (first), select: (first, SUM(score) > AS TMP_0) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Sorted Combine > Partitioning : RANDOM_PARTITIONED > Stage 3 : GroupReduce > content : groupBy: (first), select: (first, > SUM(score) AS TMP_0) > ship_strategy : Hash Partition on [0] > exchange_mode : PIPELINED > driver_strategy : Sorted Group Reduce > Partitioning : RANDOM_PARTITIONED > Stage 2 : Map > content : to: Row(f0: String, f1: > Double) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Map > Partitioning : RANDOM_PARTITIONED > Stage 1 : Map > content : Map at > emitDataSet(CsvTableSink.scala:67) > ship_strategy : Forward > exchange_mode : PIPELINED > driver_strategy : Map > Partitioning : > RANDOM_PARTITIONED > Stage 0 : Data Sink > content : > TextOutputFormat (/tmp/wordcount1) - UTF-8 > ship_strategy : Forward > exchange_mode : > PIPELINED > Partitioning : > RANDOM_PARTITIONED > {color:red} > Stage 13 : Data Source > {color} > content : collect elements with CollectionInputFormat > Partitioning : RANDOM_PARTITIONED > Stage 12 : Map > content : prepare select: (first, SUM(score) AS TMP_0) > ship_strategy : Forward > exchange_mode :
[jira] [Comment Edited] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167242#comment-16167242 ] Dian Fu edited comment on FLINK-7606 at 9/15/17 2:18 AM: - I think this is by design otherwise users could not get the timed out partially matched sequences as they may be cleared by the timers. For the memory leak issue, it seems that you are using the {{MemoryStateBackend}}, will it just be used for test or will also be used in your production environment? I think with {{RocksDBStateBackend}}, there will be no memory leak issue. was (Author: dian.fu): I think this is by design otherwise users could not get the timed out partially matched sequences as they will be cleared by the timers. For the memory leak issue, it seems that you are using the {{MemoryStateBackend}}, is it just used for test or will also used in your product environment? I think with {{RocksDBStateBackend}}, there will be no memory leak issue. > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167242#comment-16167242 ] Dian Fu commented on FLINK-7606: I think this is by design otherwise users could not get the timed out partially matched sequences as they will be cleared by the timers. For the memory leak issue, it seems that you are using the {{MemoryStateBackend}}, is it just used for test or will also used in your product environment? I think with {{RocksDBStateBackend}}, there will be no memory leak issue. > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167224#comment-16167224 ] ASF GitHub Bot commented on FLINK-5859: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139049572 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util.{ArrayList => JArrayList, List => JList} + +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, PartitionPruner} + +import scala.collection.JavaConverters._ + +/** + * A [[TableSource]] extending this class is a partition table, + * and will get the relevant partitions about the query. + * + * @tparam T The return type of the [[TableSource]]. + */ +abstract class PartitionableTableSource[T] extends FilterableTableSource[T] { + + private var relBuilder: Option[RelBuilder] = None + + /** +* Get all partitions belong to this table +* +* @return All partitions belong to this table +*/ + def getAllPartitions: JList[Partition] + + /** +* Get partition field names. +* +* @return Partition field names. +*/ + def getPartitionFieldNames: Array[String] + + /** +* Get partition field types. +* +* @return Partition field types. +*/ + def getPartitionFieldTypes: Array[TypeInformation[_]] + + /** +* Whether drop partition predicates after apply partition pruning. +* +* @return true only if the result is correct without partition predicate +*/ + def supportDropPartitionPredicate: Boolean = false + + /** +* @return Pruned partitions +*/ + def getPrunedPartitions: JList[Partition] + + /** +* @return True if apply partition pruning +*/ + def isPartitionPruned: Boolean + + /** +* If a partitionable table source which can't apply non-partition filters should not pick any +* predicates. +* If a partitionable table source which can apply non-partition filters should check and pick +* only predicates this table source can support. +* +* After trying to push pruned-partitions and predicates down, we should return a new +* [[TableSource]] instance which holds all pruned-partitions and all pushed down predicates. +* Even if we actually pushed nothing down, it is recommended that we still return a new +* [[TableSource]] instance since we will mark the returned instance as filter push down has +* been tried. +* +* We also should note to not changing the form of the predicates passed in. It has been +* organized in CNF conjunctive form, and we should only take or leave each element from the +* list. Don't try to reorganize the predicates if you are absolutely confident with that. +* +* @param partitionPruned Whether partition pruning is applied. --- End diff -- We should make this flag more clear. If you mean this flag represents whether the partition pruning is applied, i would say it should always be true, because when this method been called, at least framework had tried to apply the partition pruning. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature >
[GitHub] flink pull request #4667: [FLINK-5859] [table] Add PartitionableTableSource ...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139049572 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util.{ArrayList => JArrayList, List => JList} + +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, PartitionPruner} + +import scala.collection.JavaConverters._ + +/** + * A [[TableSource]] extending this class is a partition table, + * and will get the relevant partitions about the query. + * + * @tparam T The return type of the [[TableSource]]. + */ +abstract class PartitionableTableSource[T] extends FilterableTableSource[T] { + + private var relBuilder: Option[RelBuilder] = None + + /** +* Get all partitions belong to this table +* +* @return All partitions belong to this table +*/ + def getAllPartitions: JList[Partition] + + /** +* Get partition field names. +* +* @return Partition field names. +*/ + def getPartitionFieldNames: Array[String] + + /** +* Get partition field types. +* +* @return Partition field types. +*/ + def getPartitionFieldTypes: Array[TypeInformation[_]] + + /** +* Whether drop partition predicates after apply partition pruning. +* +* @return true only if the result is correct without partition predicate +*/ + def supportDropPartitionPredicate: Boolean = false + + /** +* @return Pruned partitions +*/ + def getPrunedPartitions: JList[Partition] + + /** +* @return True if apply partition pruning +*/ + def isPartitionPruned: Boolean + + /** +* If a partitionable table source which can't apply non-partition filters should not pick any +* predicates. +* If a partitionable table source which can apply non-partition filters should check and pick +* only predicates this table source can support. +* +* After trying to push pruned-partitions and predicates down, we should return a new +* [[TableSource]] instance which holds all pruned-partitions and all pushed down predicates. +* Even if we actually pushed nothing down, it is recommended that we still return a new +* [[TableSource]] instance since we will mark the returned instance as filter push down has +* been tried. +* +* We also should note to not changing the form of the predicates passed in. It has been +* organized in CNF conjunctive form, and we should only take or leave each element from the +* list. Don't try to reorganize the predicates if you are absolutely confident with that. +* +* @param partitionPruned Whether partition pruning is applied. --- End diff -- We should make this flag more clear. If you mean this flag represents whether the partition pruning is applied, i would say it should always be true, because when this method been called, at least framework had tried to apply the partition pruning. ---
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167219#comment-16167219 ] ASF GitHub Bot commented on FLINK-5859: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139049316 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util.{ArrayList => JArrayList, List => JList} + +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, PartitionPruner} + +import scala.collection.JavaConverters._ + +/** + * A [[TableSource]] extending this class is a partition table, + * and will get the relevant partitions about the query. + * + * @tparam T The return type of the [[TableSource]]. + */ +abstract class PartitionableTableSource[T] extends FilterableTableSource[T] { + + private var relBuilder: Option[RelBuilder] = None + + /** +* Get all partitions belong to this table +* +* @return All partitions belong to this table +*/ + def getAllPartitions: JList[Partition] + + /** +* Get partition field names. +* +* @return Partition field names. +*/ + def getPartitionFieldNames: Array[String] + + /** +* Get partition field types. +* +* @return Partition field types. +*/ + def getPartitionFieldTypes: Array[TypeInformation[_]] + + /** +* Whether drop partition predicates after apply partition pruning. +* +* @return true only if the result is correct without partition predicate +*/ + def supportDropPartitionPredicate: Boolean = false + + /** +* @return Pruned partitions +*/ + def getPrunedPartitions: JList[Partition] + + /** +* @return True if apply partition pruning +*/ + def isPartitionPruned: Boolean + + /** +* If a partitionable table source which can't apply non-partition filters should not pick any +* predicates. +* If a partitionable table source which can apply non-partition filters should check and pick +* only predicates this table source can support. +* +* After trying to push pruned-partitions and predicates down, we should return a new +* [[TableSource]] instance which holds all pruned-partitions and all pushed down predicates. +* Even if we actually pushed nothing down, it is recommended that we still return a new +* [[TableSource]] instance since we will mark the returned instance as filter push down has +* been tried. +* +* We also should note to not changing the form of the predicates passed in. It has been +* organized in CNF conjunctive form, and we should only take or leave each element from the +* list. Don't try to reorganize the predicates if you are absolutely confident with that. +* +* @param partitionPruned Whether partition pruning is applied. +* @param prunedPartitions Remaining partitions after partition pruning applied. --- End diff -- Looks like the definition of "prunedPartitions" is contrary here. I think we should stick to only one definition, either "prunedPartitions" represents all partitions which have been pruned, or all remaining partitions which survive after pruning. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL:
[GitHub] flink pull request #4667: [FLINK-5859] [table] Add PartitionableTableSource ...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139049316 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util.{ArrayList => JArrayList, List => JList} + +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, PartitionPruner} + +import scala.collection.JavaConverters._ + +/** + * A [[TableSource]] extending this class is a partition table, + * and will get the relevant partitions about the query. + * + * @tparam T The return type of the [[TableSource]]. + */ +abstract class PartitionableTableSource[T] extends FilterableTableSource[T] { + + private var relBuilder: Option[RelBuilder] = None + + /** +* Get all partitions belong to this table +* +* @return All partitions belong to this table +*/ + def getAllPartitions: JList[Partition] + + /** +* Get partition field names. +* +* @return Partition field names. +*/ + def getPartitionFieldNames: Array[String] + + /** +* Get partition field types. +* +* @return Partition field types. +*/ + def getPartitionFieldTypes: Array[TypeInformation[_]] + + /** +* Whether drop partition predicates after apply partition pruning. +* +* @return true only if the result is correct without partition predicate +*/ + def supportDropPartitionPredicate: Boolean = false + + /** +* @return Pruned partitions +*/ + def getPrunedPartitions: JList[Partition] + + /** +* @return True if apply partition pruning +*/ + def isPartitionPruned: Boolean + + /** +* If a partitionable table source which can't apply non-partition filters should not pick any +* predicates. +* If a partitionable table source which can apply non-partition filters should check and pick +* only predicates this table source can support. +* +* After trying to push pruned-partitions and predicates down, we should return a new +* [[TableSource]] instance which holds all pruned-partitions and all pushed down predicates. +* Even if we actually pushed nothing down, it is recommended that we still return a new +* [[TableSource]] instance since we will mark the returned instance as filter push down has +* been tried. +* +* We also should note to not changing the form of the predicates passed in. It has been +* organized in CNF conjunctive form, and we should only take or leave each element from the +* list. Don't try to reorganize the predicates if you are absolutely confident with that. +* +* @param partitionPruned Whether partition pruning is applied. +* @param prunedPartitions Remaining partitions after partition pruning applied. --- End diff -- Looks like the definition of "prunedPartitions" is contrary here. I think we should stick to only one definition, either "prunedPartitions" represents all partitions which have been pruned, or all remaining partitions which survive after pruning. ---
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167216#comment-16167216 ] ASF GitHub Bot commented on FLINK-5859: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139048604 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util.{ArrayList => JArrayList, List => JList} + +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, PartitionPruner} + +import scala.collection.JavaConverters._ + +/** + * A [[TableSource]] extending this class is a partition table, + * and will get the relevant partitions about the query. + * + * @tparam T The return type of the [[TableSource]]. + */ +abstract class PartitionableTableSource[T] extends FilterableTableSource[T] { + + private var relBuilder: Option[RelBuilder] = None + + /** +* Get all partitions belong to this table +* +* @return All partitions belong to this table +*/ + def getAllPartitions: JList[Partition] + + /** +* Get partition field names. +* +* @return Partition field names. +*/ + def getPartitionFieldNames: Array[String] + + /** +* Get partition field types. +* +* @return Partition field types. +*/ + def getPartitionFieldTypes: Array[TypeInformation[_]] + + /** +* Whether drop partition predicates after apply partition pruning. +* +* @return true only if the result is correct without partition predicate +*/ + def supportDropPartitionPredicate: Boolean = false + + /** +* @return Pruned partitions +*/ + def getPrunedPartitions: JList[Partition] + + /** +* @return True if apply partition pruning +*/ + def isPartitionPruned: Boolean + + /** +* If a partitionable table source which can't apply non-partition filters should not pick any +* predicates. +* If a partitionable table source which can apply non-partition filters should check and pick +* only predicates this table source can support. +* +* After trying to push pruned-partitions and predicates down, we should return a new +* [[TableSource]] instance which holds all pruned-partitions and all pushed down predicates. +* Even if we actually pushed nothing down, it is recommended that we still return a new +* [[TableSource]] instance since we will mark the returned instance as filter push down has +* been tried. +* +* We also should note to not changing the form of the predicates passed in. It has been +* organized in CNF conjunctive form, and we should only take or leave each element from the +* list. Don't try to reorganize the predicates if you are absolutely confident with that. +* +* @param partitionPruned Whether partition pruning is applied. +* @param prunedPartitions Remaining partitions after partition pruning applied. +* Notes: If partition pruning is not applied, prunedPartitions is empty. +* @param predicates A list contains conjunctive predicates, you should pick and remove all +* expressions that can be pushed down. The remaining elements of this +* list will further evaluated by framework. +* @return A new cloned instance of [[TableSource]]. +
[GitHub] flink pull request #4667: [FLINK-5859] [table] Add PartitionableTableSource ...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139048604 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util.{ArrayList => JArrayList, List => JList} + +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, PartitionPruner} + +import scala.collection.JavaConverters._ + +/** + * A [[TableSource]] extending this class is a partition table, + * and will get the relevant partitions about the query. + * + * @tparam T The return type of the [[TableSource]]. + */ +abstract class PartitionableTableSource[T] extends FilterableTableSource[T] { + + private var relBuilder: Option[RelBuilder] = None + + /** +* Get all partitions belong to this table +* +* @return All partitions belong to this table +*/ + def getAllPartitions: JList[Partition] + + /** +* Get partition field names. +* +* @return Partition field names. +*/ + def getPartitionFieldNames: Array[String] + + /** +* Get partition field types. +* +* @return Partition field types. +*/ + def getPartitionFieldTypes: Array[TypeInformation[_]] + + /** +* Whether drop partition predicates after apply partition pruning. +* +* @return true only if the result is correct without partition predicate +*/ + def supportDropPartitionPredicate: Boolean = false + + /** +* @return Pruned partitions +*/ + def getPrunedPartitions: JList[Partition] + + /** +* @return True if apply partition pruning +*/ + def isPartitionPruned: Boolean + + /** +* If a partitionable table source which can't apply non-partition filters should not pick any +* predicates. +* If a partitionable table source which can apply non-partition filters should check and pick +* only predicates this table source can support. +* +* After trying to push pruned-partitions and predicates down, we should return a new +* [[TableSource]] instance which holds all pruned-partitions and all pushed down predicates. +* Even if we actually pushed nothing down, it is recommended that we still return a new +* [[TableSource]] instance since we will mark the returned instance as filter push down has +* been tried. +* +* We also should note to not changing the form of the predicates passed in. It has been +* organized in CNF conjunctive form, and we should only take or leave each element from the +* list. Don't try to reorganize the predicates if you are absolutely confident with that. +* +* @param partitionPruned Whether partition pruning is applied. +* @param prunedPartitions Remaining partitions after partition pruning applied. +* Notes: If partition pruning is not applied, prunedPartitions is empty. +* @param predicates A list contains conjunctive predicates, you should pick and remove all +* expressions that can be pushed down. The remaining elements of this +* list will further evaluated by framework. +* @return A new cloned instance of [[TableSource]]. +*/ + def applyPrunedPartitionsAndPredicate( +partitionPruned: Boolean, +prunedPartitions: JList[Partition], +predicates: JList[Expression]): TableSource[T] + + + /** +* Check and pick
[jira] [Commented] (FLINK-4047) Fix documentation about determinism of KeySelectors
[ https://issues.apache.org/jira/browse/FLINK-4047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167214#comment-16167214 ] ASF GitHub Bot commented on FLINK-4047: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/4659#discussion_r139048412 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java --- @@ -36,7 +37,7 @@ public interface KeySelectorextends Function, Serializable { /** -* User-defined function that extracts the key from an arbitrary object. +* User-defined function that extracts the key from an deterministic object. --- End diff -- Okay. Thanks. I will correct it soon. > Fix documentation about determinism of KeySelectors > --- > > Key: FLINK-4047 > URL: https://issues.apache.org/jira/browse/FLINK-4047 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.1.0, 1.0.3 >Reporter: Fabian Hueske >Priority: Blocker > Labels: starter > Fix For: 1.4.0 > > > KeySelectors must return deterministic keys, i.e., if invoked multiple times > on the same object, the returned key must be the same. > The documentation about this aspect is broken ("The key can be of any type > and be derived from arbitrary computations."). > We need to fix the JavaDocs of the {{KeySelector}} interface and the web > documentation > (https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#specifying-keys). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4659: [FLINK-4047] Fix documentation about determinism o...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/4659#discussion_r139048412 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java --- @@ -36,7 +37,7 @@ public interface KeySelectorextends Function, Serializable { /** -* User-defined function that extracts the key from an arbitrary object. +* User-defined function that extracts the key from an deterministic object. --- End diff -- Okay. Thanks. I will correct it soon. ---
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167213#comment-16167213 ] ASF GitHub Bot commented on FLINK-5859: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139048337 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala --- @@ -55,4 +55,9 @@ trait FilterableTableSource[T] { */ def isFilterPushedDown: Boolean + /** +* @param relBuilder Builder for relational expressions. +*/ + def setRelBuilder(relBuilder: RelBuilder): Unit --- End diff -- Can you move this method to PartitionableTableSource? > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4667: [FLINK-5859] [table] Add PartitionableTableSource ...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139048337 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala --- @@ -55,4 +55,9 @@ trait FilterableTableSource[T] { */ def isFilterPushedDown: Boolean + /** +* @param relBuilder Builder for relational expressions. +*/ + def setRelBuilder(relBuilder: RelBuilder): Unit --- End diff -- Can you move this method to PartitionableTableSource? ---
[jira] [Closed] (FLINK-7617) Remove string format in BitSet to improve the performance of BuildSideOuterjoin
[ https://issues.apache.org/jira/browse/FLINK-7617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-7617. - Resolution: Fixed Fix Version/s: 1.3.3 1.4.0 > Remove string format in BitSet to improve the performance of > BuildSideOuterjoin > --- > > Key: FLINK-7617 > URL: https://issues.apache.org/jira/browse/FLINK-7617 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Jingsong Lee >Assignee: Jingsong Lee > Fix For: 1.4.0, 1.3.3 > > > When using BuildSideOuterjoin, will frequently call Bitset.set and get, there > will be > Preconditions.checkArgument (index = 0, > String.format ("Input Index [% d] is larger than BitSet available size [% > d].", Index, bitLength)); > Of the check, String.format will lead to a sharp decline performance. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7617) Remove string format in BitSet to improve the performance of BuildSideOuterjoin
[ https://issues.apache.org/jira/browse/FLINK-7617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167179#comment-16167179 ] ASF GitHub Bot commented on FLINK-7617: --- Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/4668 @fhueske sure > Remove string format in BitSet to improve the performance of > BuildSideOuterjoin > --- > > Key: FLINK-7617 > URL: https://issues.apache.org/jira/browse/FLINK-7617 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Jingsong Lee >Assignee: Jingsong Lee > > When using BuildSideOuterjoin, will frequently call Bitset.set and get, there > will be > Preconditions.checkArgument (index = 0, > String.format ("Input Index [% d] is larger than BitSet available size [% > d].", Index, bitLength)); > Of the check, String.format will lead to a sharp decline performance. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4668: [FLINK-7617] Remove string format in BitSet to improve th...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/4668 @fhueske sure ---
[jira] [Commented] (FLINK-3991) Remove deprecated configuration keys from ConfigConstants
[ https://issues.apache.org/jira/browse/FLINK-3991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167060#comment-16167060 ] Chetna Chaudhari commented on FLINK-3991: - Is it still open? Can I work on it? > Remove deprecated configuration keys from ConfigConstants > - > > Key: FLINK-3991 > URL: https://issues.apache.org/jira/browse/FLINK-3991 > Project: Flink > Issue Type: Sub-task > Components: Build System >Reporter: Robert Metzger > Fix For: 2.0.0 > > > In > https://github.com/apache/flink/commit/b0acd97935cd21843bac3b9b5afa3662b52bb95d#diff-40616c4678c3fbfe07c0701505ce0567 > I deprecated some configuration keys. > They are unused and need to be removed with the 2.0 release. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4673: [hotfix] [cep] Fix afterMatchStrategy parameter missing i...
Github user yestinchen commented on the issue: https://github.com/apache/flink/pull/4673 Hi @dawidwys , I just found this issue in after match feature, which causes AfterMatchSkipStrategy not work. This may be introduced while merging file conflicts. I addressed that in this PR and added a simple test to exam such problems. Please merge this to the repository. Thanks. ---
[jira] [Commented] (FLINK-7622) Respect local KPL queue size in FlinkKinesisProducer when adding records to KPL client
[ https://issues.apache.org/jira/browse/FLINK-7622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166925#comment-16166925 ] Bowen Li commented on FLINK-7622: - [~tzulitai] I think FLINK-7508 indirectly solves this issue, since FLINK-7508 greatly increased the throughput of Kinesis producer. Here's an example: https://imgur.com/a/u198h Here are the metrics of our prod Flink job - UserRecords Put (aka UserRecords put into KPL queue), and user_records_pending (aka outstanding UserRecords waiting to be sent to AWS in the queue). When using per_request threading model, a small number (~0.5million) of UserRecords can cause huge number (~15k) of records pending because the throughput is so low. After switching to pooled threading model, you can see the number of outstanding UserRecords has dropped significantly (~0) even though the number of UserRecords put into the queue grow to 16X bigger (~8million at peak). Take a closer look at our user_records_pending metric for the past two weeks at https://imgur.com/a/2YxIm, the # of outstanding records is consistently under 150 (impressive, right?). Thus, I believe propagating back pressure to upstream for FlinkKinesisProducer is not necessary anymore. > Respect local KPL queue size in FlinkKinesisProducer when adding records to > KPL client > -- > > Key: FLINK-7622 > URL: https://issues.apache.org/jira/browse/FLINK-7622 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai > > This issue was brought to discussion by [~sthm] offline. > Currently, records are added to the Kinesis KPL producer client without > checking the number of outstanding records within the local KPL queue. This > manner is basically neglecting backpressure when producing to Kinesis through > KPL, and can therefore exhaust system resources. > We should respect {{producer.getOutstandingRecordsCount()}} as a measure of > backpressure, and propagate backpressure upstream by blocking further sink > invocations when some threshold of outstanding record count is exceeded. The > recommended threshold [1] seems to be 10,000. > [1] > https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/ -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode
[ https://issues.apache.org/jira/browse/FLINK-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166783#comment-16166783 ] ASF GitHub Bot commented on FLINK-7508: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4656 @tzulitai Thanks, Gordon! I watched your presentation on "managing state of Flink", and you did a great job explaining all details. I added an option for PER_REQUEST model since it doesn't hurt anything. This PR is for 1.4. There's [another PR here](https://github.com/apache/flink/pull/4657) I think need to be merged to both 1.4 and 1.3.x > switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode > rather than Per_Request mode > > > Key: FLINK-7508 > URL: https://issues.apache.org/jira/browse/FLINK-7508 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.4.0 > > > KinesisProducerLibrary (KPL) 0.10.x had been using a > One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which > is very expensive. > 0.12.4 introduced a new [ThreadingMode - > Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225], > which will use a thread pool. This hugely improves KPL's performance and > reduces consumed resources. By default, KPL still uses per-request mode. We > should explicitly switch FlinkKinesisProducer's KPL threading mode to > 'Pooled'. > This work depends on FLINK-7366 and FLINK-7508 > Benchmarking I did: > * Environment: Running a Flink hourly-sliding windowing job on 18-node EMR > cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink > job generates about 21million UserRecords, which means that we generated a > test load of 21million UserRecords at the first minute of each hour. > * Criteria: Test KPL throughput per minute. Since the default RecordTTL for > KPL is 30 sec, we can be sure that either all UserRecords are sent by KPL > within a minute, or we will see UserRecord expiration errors. > * One-New-Thread-Per-Request model: max throughput is about 2million > UserRecords per min; it doesn't go beyond that because CPU utilization goes > to 100%, everything stopped working and that Flink job crashed. > * Thread-Pool model with pool size of 10: it sends out 21million UserRecords > within 30 sec without any UserRecord expiration errors. The average peak CPU > utilization is about 20% - 30%. So 21million UserRecords/min is not the max > throughput of thread-pool model. We didn't go any further because 1) this > throughput is already a couple times more than what we really need, and 2) we > don't have a quick way of increasing the test load > Thus, I propose switching FlinkKinesisProducer to Thread-Pool mode. > [~tzulitai] What do you think -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4656: [FLINK-7508][kinesis] switch FlinkKinesisProducer to use ...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4656 @tzulitai Thanks, Gordon! I watched your presentation on "managing state of Flink", and you did a great job explaining all details. I added an option for PER_REQUEST model since it doesn't hurt anything. This PR is for 1.4. There's [another PR here](https://github.com/apache/flink/pull/4657) I think need to be merged to both 1.4 and 1.3.x ---
[jira] [Commented] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode
[ https://issues.apache.org/jira/browse/FLINK-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166709#comment-16166709 ] ASF GitHub Bot commented on FLINK-7508: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4656#discussion_r138965647 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -171,8 +171,9 @@ public void open(Configuration parameters) throws Exception { super.open(parameters); // check and pass the configuration properties - KinesisProducerConfiguration producerConfig = KinesisConfigUtil.validateProducerConfiguration(configProps); + KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps); producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); + producerConfig.setThreadingModel(KinesisProducerConfiguration.ThreadingModel.POOLED); --- End diff -- On a second thought for this. Though I believe POOLED model is the best option for most of the use cases I can think of, we should give users the flexibility to make decisions. Adding PER_REQUEST model > switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode > rather than Per_Request mode > > > Key: FLINK-7508 > URL: https://issues.apache.org/jira/browse/FLINK-7508 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.4.0 > > > KinesisProducerLibrary (KPL) 0.10.x had been using a > One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which > is very expensive. > 0.12.4 introduced a new [ThreadingMode - > Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225], > which will use a thread pool. This hugely improves KPL's performance and > reduces consumed resources. By default, KPL still uses per-request mode. We > should explicitly switch FlinkKinesisProducer's KPL threading mode to > 'Pooled'. > This work depends on FLINK-7366 and FLINK-7508 > Benchmarking I did: > * Environment: Running a Flink hourly-sliding windowing job on 18-node EMR > cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink > job generates about 21million UserRecords, which means that we generated a > test load of 21million UserRecords at the first minute of each hour. > * Criteria: Test KPL throughput per minute. Since the default RecordTTL for > KPL is 30 sec, we can be sure that either all UserRecords are sent by KPL > within a minute, or we will see UserRecord expiration errors. > * One-New-Thread-Per-Request model: max throughput is about 2million > UserRecords per min; it doesn't go beyond that because CPU utilization goes > to 100%, everything stopped working and that Flink job crashed. > * Thread-Pool model with pool size of 10: it sends out 21million UserRecords > within 30 sec without any UserRecord expiration errors. The average peak CPU > utilization is about 20% - 30%. So 21million UserRecords/min is not the max > throughput of thread-pool model. We didn't go any further because 1) this > throughput is already a couple times more than what we really need, and 2) we > don't have a quick way of increasing the test load > Thus, I propose switching FlinkKinesisProducer to Thread-Pool mode. > [~tzulitai] What do you think -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4656: [FLINK-7508][kinesis] switch FlinkKinesisProducer ...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4656#discussion_r138965647 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -171,8 +171,9 @@ public void open(Configuration parameters) throws Exception { super.open(parameters); // check and pass the configuration properties - KinesisProducerConfiguration producerConfig = KinesisConfigUtil.validateProducerConfiguration(configProps); + KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps); producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); + producerConfig.setThreadingModel(KinesisProducerConfiguration.ThreadingModel.POOLED); --- End diff -- On a second thought for this. Though I believe POOLED model is the best option for most of the use cases I can think of, we should give users the flexibility to make decisions. Adding PER_REQUEST model ---
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166633#comment-16166633 ] ASF GitHub Bot commented on FLINK-7567: --- Github user mlipkovich commented on the issue: https://github.com/apache/flink/pull/4655 The local build works fine. If you mean `mvn verify` by compatibility plugin it also worked with no issues > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl >Assignee: Mikhail Lipkovich > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4655: [FLINK-7567]: Removed keepPartitioning parameter from ite...
Github user mlipkovich commented on the issue: https://github.com/apache/flink/pull/4655 The local build works fine. If you mean `mvn verify` by compatibility plugin it also worked with no issues ---
[jira] [Commented] (FLINK-4814) Remove extra storage location for externalized checkpoint metadata
[ https://issues.apache.org/jira/browse/FLINK-4814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166576#comment-16166576 ] Elias Levy commented on FLINK-4814: --- Yes please. That would be helpful. I would suggest multiple levels of overriding. E.g. the cluster config can provide defaults, as it does now for some things. You should be able to package a config file in your job's resources that override the cluster's defaults. You should also be able to pass the job a config file at run time to override values in both of those. And command line arguments and programmatically set values should override all everything else. It would be good for the job to print its configuration on start up to aid debugging of settings, like the Kafka producer/consumer do. Also, as I mentioned in the mailing list, the mini cluster used in the local execution environment doesn't load a config file, so there are things, like external checkpoints, that can't be tested at the moment from within an IDE. API-wise it could also be more consistent. At the moment you configure some settings via the stream environment (e.g. {{env.setStreamTimeCharacteristic()}}), others via {{env.getConfig.foo}}, more via {{env.getCheckpointConfig.bar}}, and some only via the config file. > Remove extra storage location for externalized checkpoint metadata > -- > > Key: FLINK-4814 > URL: https://issues.apache.org/jira/browse/FLINK-4814 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi > > Follow up for FLINK-4512. > Store checkpoint meta data in checkpoint directory. That makes it simpler > for users to track and clean up checkpoints manually, if they want to retain > externalized checkpoints across cancellations and terminal failures. > Every state backend needs to be able to provide a storage location for the > checkpoint metadata. The memory state backend would hence not work with > externalized checkpoints, unless one sets explicitly a parameter > `setExternalizedCheckpointsLocation(uri)`. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4658: [Flink-7596][Table API & SQL] fix bug when Set Operation ...
Github user Xpray commented on the issue: https://github.com/apache/flink/pull/4658 @twalthr , I've rewrite the methods, please have a look. ---
[jira] [Commented] (FLINK-7589) org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 159764230; received: 64638536)
[ https://issues.apache.org/jira/browse/FLINK-7589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166527#comment-16166527 ] Steve Loughran commented on FLINK-7589: --- bq. is there any plan to make Flink use AWS S3 SDK directly rather than Hadoop's S3 implementation? the topic has surface. Note that if you do want a client which is resilient to transitive failures, that's a fair amount of extra work, and you have to implement the fault injecting wrapper on the AWS SDK to actually verify that it works on an functional test suite...that's something to consider shipping so that people can use it in their own integration tests. > org.apache.http.ConnectionClosedException: Premature end of Content-Length > delimited message body (expected: 159764230; received: 64638536) > --- > > Key: FLINK-7589 > URL: https://issues.apache.org/jira/browse/FLINK-7589 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.2 >Reporter: Bowen Li >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > When I tried to resume a Flink job from a savepoint with different > parallelism, I ran into this error. And the resume failed. > {code:java} > 2017-09-05 21:53:57,317 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- returnsivs -> > Sink: xxx (7/12) (0da16ec908fc7b9b16a5c2cf1aa92947) switched from RUNNING to > FAILED. > org.apache.http.ConnectionClosedException: Premature end of Content-Length > delimited message body (expected: 159764230; received: 64638536 > at > org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:180) > at > org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137) > at java.io.FilterInputStream.read(FilterInputStream.java:133) > at java.io.FilterInputStream.read(FilterInputStream.java:133) > at java.io.FilterInputStream.read(FilterInputStream.java:133) > at > com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:77) > at java.io.FilterInputStream.read(FilterInputStream.java:133) > at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:160) > at java.io.DataInputStream.read(DataInputStream.java:149) > at > org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:72) > at > org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:61) > at > org.apache.flink.runtime.util.NonClosingStreamDecorator.read(NonClosingStreamDecorator.java:47) > at java.io.DataInputStream.readFully(DataInputStream.java:195) > at java.io.DataInputStream.readLong(DataInputStream.java:416) > at > org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:68) > at > org.apache.flink.streaming.api.operators.InternalTimer$TimerSerializer.deserialize(InternalTimer.java:156) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.restoreTimersForKeyGroup(HeapInternalTimerService.java:345) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:141) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:496) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...
Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/4665 @aljoscha i agree that the name set to be "numLateElementsDropped", and do you mean that my result should minus the num of element that go to side output which is skipped and lateElement? ---
[GitHub] flink pull request #4673: [hotfix] [cep] Fix afterMatchStrategy parameter mi...
GitHub user yestinchen opened a pull request: https://github.com/apache/flink/pull/4673 [hotfix] [cep] Fix afterMatchStrategy parameter missing issue ## What is the purpose of the change Fix afterMatchSkipStrategy parameter missing when calling `nfa.process()` function. This issue may be introduced during merging conflicts. ## Brief change log ## Verifying this change This change added tests and can be verified as follows: append a aftermatch test case in `CEPITCase` to exam this kind of problems. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yestinchen/flink aftermatch-hotfix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4673.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4673 commit 060dcde4bd9662c98ade061c1698fcff5bb702b3 Author: Yestin <873915...@qq.com> Date: 2017-09-14T16:01:53Z [hotfix] [cep] Fix afterMatchStrategy parameter missing issue This issue may be introduced during merging conflicts. ---
[jira] [Commented] (FLINK-7589) org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 159764230; received: 64638536)
[ https://issues.apache.org/jira/browse/FLINK-7589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166515#comment-16166515 ] Bowen Li commented on FLINK-7589: - [~aljoscha] Yes, the job restarted successfully. Just curious - is there any plan to make Flink use AWS S3 SDK directly rather than Hadoop's S3 implementation? > org.apache.http.ConnectionClosedException: Premature end of Content-Length > delimited message body (expected: 159764230; received: 64638536) > --- > > Key: FLINK-7589 > URL: https://issues.apache.org/jira/browse/FLINK-7589 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.2 >Reporter: Bowen Li >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > When I tried to resume a Flink job from a savepoint with different > parallelism, I ran into this error. And the resume failed. > {code:java} > 2017-09-05 21:53:57,317 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- returnsivs -> > Sink: xxx (7/12) (0da16ec908fc7b9b16a5c2cf1aa92947) switched from RUNNING to > FAILED. > org.apache.http.ConnectionClosedException: Premature end of Content-Length > delimited message body (expected: 159764230; received: 64638536 > at > org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:180) > at > org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137) > at java.io.FilterInputStream.read(FilterInputStream.java:133) > at java.io.FilterInputStream.read(FilterInputStream.java:133) > at java.io.FilterInputStream.read(FilterInputStream.java:133) > at > com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:77) > at java.io.FilterInputStream.read(FilterInputStream.java:133) > at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:160) > at java.io.DataInputStream.read(DataInputStream.java:149) > at > org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:72) > at > org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:61) > at > org.apache.flink.runtime.util.NonClosingStreamDecorator.read(NonClosingStreamDecorator.java:47) > at java.io.DataInputStream.readFully(DataInputStream.java:195) > at java.io.DataInputStream.readLong(DataInputStream.java:416) > at > org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:68) > at > org.apache.flink.streaming.api.operators.InternalTimer$TimerSerializer.deserialize(InternalTimer.java:156) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.restoreTimersForKeyGroup(HeapInternalTimerService.java:345) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:141) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:496) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4658: [Flink-7596][Table API & SQL] fix bug when Set Operation ...
Github user Xpray commented on the issue: https://github.com/apache/flink/pull/4658 what about overriding `leastRestrictiveSqlType` method in `FlinkTypeFactory`? ---
[jira] [Commented] (FLINK-4796) Add new Sink interface with access to more meta data
[ https://issues.apache.org/jira/browse/FLINK-4796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166443#comment-16166443 ] Erik van Oosten commented on FLINK-4796: I am not sure why this is marked as a duplicate. The problem here is inconsistent handling of the runtime context inside the different layers under FlinkKafkaProducer: method {{getRuntimeContext}} gives {{null}} even though {{setRuntimeContext}} was called. How does that relate to the addition of a new interface? > Add new Sink interface with access to more meta data > > > Key: FLINK-4796 > URL: https://issues.apache.org/jira/browse/FLINK-4796 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Aljoscha Krettek > > The current {{SinkFunction}} cannot access the timestamps of elements which > resulted in the (somewhat hacky) {{FlinkKafkaProducer010}}. Due to other > limitations {{GenericWriteAheadSink}} is currently also a {{StreamOperator}} > and not a {{SinkFunction}}. > We should add a new interface for sinks that takes a context parameter, > similar to {{ProcessFunction}}. This will allow sinks to query additional > meta data about the element that they're receiving. > This is one ML thread where a user ran into a problem caused by this: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-td12633.html#a12635 > h3. Original Text (that is still valid but not general) > The Kafka 0.10 connector supports writing event timestamps to Kafka. > Currently, the regular DataStream APIs don't allow user code to access the > event timestamp easily. That's why the Kafka connector is using a custom > operator ({{transform()}}) to access the event time. > With this JIRA, I would like to provide the event timestamp in the regular > DataStream APIs. > Once I'll look into the issue, I'll post some proposals how to add the > timestamp. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4658: [Flink-7596][Table API & SQL] fix bug when Set Operation ...
Github user Xpray commented on the issue: https://github.com/apache/flink/pull/4658 Thanks for your advice. I am considering a better way to solve this. ---
[jira] [Commented] (FLINK-4814) Remove extra storage location for externalized checkpoint metadata
[ https://issues.apache.org/jira/browse/FLINK-4814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166428#comment-16166428 ] Aljoscha Krettek commented on FLINK-4814: - Yes I think this is still a problem and I think it's part of a larger problem that I'm trying to address in this one-pager: https://docs.google.com/document/d/1Q8hkpl2VVXrbEaJkQNe88kuus0F5wFFiebub9hqrg1s/edit?usp=sharing I'm just laying out what I would like to have, no solutions yet, though. What do you think? > Remove extra storage location for externalized checkpoint metadata > -- > > Key: FLINK-4814 > URL: https://issues.apache.org/jira/browse/FLINK-4814 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi > > Follow up for FLINK-4512. > Store checkpoint meta data in checkpoint directory. That makes it simpler > for users to track and clean up checkpoints manually, if they want to retain > externalized checkpoints across cancellations and terminal failures. > Every state backend needs to be able to provide a storage location for the > checkpoint metadata. The memory state backend would hence not work with > externalized checkpoints, unless one sets explicitly a parameter > `setExternalizedCheckpointsLocation(uri)`. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4047) Fix documentation about determinism of KeySelectors
[ https://issues.apache.org/jira/browse/FLINK-4047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166401#comment-16166401 ] ASF GitHub Bot commented on FLINK-4047: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4659#discussion_r138918167 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java --- @@ -36,7 +37,7 @@ public interface KeySelectorextends Function, Serializable { /** -* User-defined function that extracts the key from an arbitrary object. +* User-defined function that extracts the key from an deterministic object. --- End diff -- I think this should be `User-defined function that deterministically extracts the key from an object.` > Fix documentation about determinism of KeySelectors > --- > > Key: FLINK-4047 > URL: https://issues.apache.org/jira/browse/FLINK-4047 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.1.0, 1.0.3 >Reporter: Fabian Hueske >Priority: Blocker > Labels: starter > Fix For: 1.4.0 > > > KeySelectors must return deterministic keys, i.e., if invoked multiple times > on the same object, the returned key must be the same. > The documentation about this aspect is broken ("The key can be of any type > and be derived from arbitrary computations."). > We need to fix the JavaDocs of the {{KeySelector}} interface and the web > documentation > (https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#specifying-keys). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4659: [FLINK-4047] Fix documentation about determinism o...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4659#discussion_r138918167 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java --- @@ -36,7 +37,7 @@ public interface KeySelectorextends Function, Serializable { /** -* User-defined function that extracts the key from an arbitrary object. +* User-defined function that extracts the key from an deterministic object. --- End diff -- I think this should be `User-defined function that deterministically extracts the key from an object.` ---
[jira] [Closed] (FLINK-4796) Add new Sink interface with access to more meta data
[ https://issues.apache.org/jira/browse/FLINK-4796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-4796. --- Resolution: Duplicate > Add new Sink interface with access to more meta data > > > Key: FLINK-4796 > URL: https://issues.apache.org/jira/browse/FLINK-4796 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Aljoscha Krettek > > The current {{SinkFunction}} cannot access the timestamps of elements which > resulted in the (somewhat hacky) {{FlinkKafkaProducer010}}. Due to other > limitations {{GenericWriteAheadSink}} is currently also a {{StreamOperator}} > and not a {{SinkFunction}}. > We should add a new interface for sinks that takes a context parameter, > similar to {{ProcessFunction}}. This will allow sinks to query additional > meta data about the element that they're receiving. > This is one ML thread where a user ran into a problem caused by this: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-td12633.html#a12635 > h3. Original Text (that is still valid but not general) > The Kafka 0.10 connector supports writing event timestamps to Kafka. > Currently, the regular DataStream APIs don't allow user code to access the > event timestamp easily. That's why the Kafka connector is using a custom > operator ({{transform()}}) to access the event time. > With this JIRA, I would like to provide the event timestamp in the regular > DataStream APIs. > Once I'll look into the issue, I'll post some proposals how to add the > timestamp. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4796) Add new Sink interface with access to more meta data
[ https://issues.apache.org/jira/browse/FLINK-4796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166394#comment-16166394 ] Aljoscha Krettek commented on FLINK-4796: - Ah I see, the workaround is for the NPE. > Add new Sink interface with access to more meta data > > > Key: FLINK-4796 > URL: https://issues.apache.org/jira/browse/FLINK-4796 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Aljoscha Krettek > > The current {{SinkFunction}} cannot access the timestamps of elements which > resulted in the (somewhat hacky) {{FlinkKafkaProducer010}}. Due to other > limitations {{GenericWriteAheadSink}} is currently also a {{StreamOperator}} > and not a {{SinkFunction}}. > We should add a new interface for sinks that takes a context parameter, > similar to {{ProcessFunction}}. This will allow sinks to query additional > meta data about the element that they're receiving. > This is one ML thread where a user ran into a problem caused by this: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-td12633.html#a12635 > h3. Original Text (that is still valid but not general) > The Kafka 0.10 connector supports writing event timestamps to Kafka. > Currently, the regular DataStream APIs don't allow user code to access the > event timestamp easily. That's why the Kafka connector is using a custom > operator ({{transform()}}) to access the event time. > With this JIRA, I would like to provide the event timestamp in the regular > DataStream APIs. > Once I'll look into the issue, I'll post some proposals how to add the > timestamp. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4796) Add new Sink interface with access to more meta data
[ https://issues.apache.org/jira/browse/FLINK-4796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166391#comment-16166391 ] Aljoscha Krettek commented on FLINK-4796: - Sorry, I didn't get what this workaround is for. Could you please go into a bit more detail? > Add new Sink interface with access to more meta data > > > Key: FLINK-4796 > URL: https://issues.apache.org/jira/browse/FLINK-4796 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Aljoscha Krettek > > The current {{SinkFunction}} cannot access the timestamps of elements which > resulted in the (somewhat hacky) {{FlinkKafkaProducer010}}. Due to other > limitations {{GenericWriteAheadSink}} is currently also a {{StreamOperator}} > and not a {{SinkFunction}}. > We should add a new interface for sinks that takes a context parameter, > similar to {{ProcessFunction}}. This will allow sinks to query additional > meta data about the element that they're receiving. > This is one ML thread where a user ran into a problem caused by this: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-td12633.html#a12635 > h3. Original Text (that is still valid but not general) > The Kafka 0.10 connector supports writing event timestamps to Kafka. > Currently, the regular DataStream APIs don't allow user code to access the > event timestamp easily. That's why the Kafka connector is using a custom > operator ({{transform()}}) to access the event time. > With this JIRA, I would like to provide the event timestamp in the regular > DataStream APIs. > Once I'll look into the issue, I'll post some proposals how to add the > timestamp. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-4796) Add new Sink interface with access to more meta data
[ https://issues.apache.org/jira/browse/FLINK-4796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-4796: --- Assignee: Aljoscha Krettek > Add new Sink interface with access to more meta data > > > Key: FLINK-4796 > URL: https://issues.apache.org/jira/browse/FLINK-4796 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Aljoscha Krettek > > The current {{SinkFunction}} cannot access the timestamps of elements which > resulted in the (somewhat hacky) {{FlinkKafkaProducer010}}. Due to other > limitations {{GenericWriteAheadSink}} is currently also a {{StreamOperator}} > and not a {{SinkFunction}}. > We should add a new interface for sinks that takes a context parameter, > similar to {{ProcessFunction}}. This will allow sinks to query additional > meta data about the element that they're receiving. > This is one ML thread where a user ran into a problem caused by this: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-td12633.html#a12635 > h3. Original Text (that is still valid but not general) > The Kafka 0.10 connector supports writing event timestamps to Kafka. > Currently, the regular DataStream APIs don't allow user code to access the > event timestamp easily. That's why the Kafka connector is using a custom > operator ({{transform()}}) to access the event time. > With this JIRA, I would like to provide the event timestamp in the regular > DataStream APIs. > Once I'll look into the issue, I'll post some proposals how to add the > timestamp. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-2646) Rich functions should provide a method "closeAfterFailure()"
[ https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166387#comment-16166387 ] Aljoscha Krettek commented on FLINK-2646: - I thought about this and the new {{closeAfterFailure()}} should not actually call {{close()}} by default because this would force implementations that only want "close" behaviour, i.e. "close successfully", to also implement an empty {{closeAfterFailure()}} to prevent the default {{close()}} invocation. This can easily be forgotten and would lead to wrong results. > Rich functions should provide a method "closeAfterFailure()" > > > Key: FLINK-2646 > URL: https://issues.apache.org/jira/browse/FLINK-2646 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 0.10.0 >Reporter: Stephan Ewen > Fix For: 1.0.0 > > > Right now, the {{close()}} method of rich functions is invoked in case of > proper completion, and in case of canceling in case of error (to allow for > cleanup). > In certain cases, the user function needs to know why it is closed, whether > the task completed in a regular fashion, or was canceled/failed. > I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By > default, this method calls {{close()}}. The runtime is the changed to call > {{close()}} as part of the regular execution and {{closeAfterFailure()}} in > case of an irregular exit. > Because by default all cases call {{close()}} the change would not be API > breaking. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7589) org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 159764230; received: 64638536)
[ https://issues.apache.org/jira/browse/FLINK-7589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166370#comment-16166370 ] Aljoscha Krettek commented on FLINK-7589: - Yes, I think we cannot do much except wait here. [~phoenixjiangnan] This was a transient failure and a subsequent restore succeeded, right? > org.apache.http.ConnectionClosedException: Premature end of Content-Length > delimited message body (expected: 159764230; received: 64638536) > --- > > Key: FLINK-7589 > URL: https://issues.apache.org/jira/browse/FLINK-7589 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.2 >Reporter: Bowen Li >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > When I tried to resume a Flink job from a savepoint with different > parallelism, I ran into this error. And the resume failed. > {code:java} > 2017-09-05 21:53:57,317 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- returnsivs -> > Sink: xxx (7/12) (0da16ec908fc7b9b16a5c2cf1aa92947) switched from RUNNING to > FAILED. > org.apache.http.ConnectionClosedException: Premature end of Content-Length > delimited message body (expected: 159764230; received: 64638536 > at > org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:180) > at > org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137) > at java.io.FilterInputStream.read(FilterInputStream.java:133) > at java.io.FilterInputStream.read(FilterInputStream.java:133) > at java.io.FilterInputStream.read(FilterInputStream.java:133) > at > com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:77) > at java.io.FilterInputStream.read(FilterInputStream.java:133) > at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:160) > at java.io.DataInputStream.read(DataInputStream.java:149) > at > org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:72) > at > org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:61) > at > org.apache.flink.runtime.util.NonClosingStreamDecorator.read(NonClosingStreamDecorator.java:47) > at java.io.DataInputStream.readFully(DataInputStream.java:195) > at java.io.DataInputStream.readLong(DataInputStream.java:416) > at > org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:68) > at > org.apache.flink.streaming.api.operators.InternalTimer$TimerSerializer.deserialize(InternalTimer.java:156) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.restoreTimersForKeyGroup(HeapInternalTimerService.java:345) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:141) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:496) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7617) Remove string format in BitSet to improve the performance of BuildSideOuterjoin
[ https://issues.apache.org/jira/browse/FLINK-7617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166369#comment-16166369 ] ASF GitHub Bot commented on FLINK-7617: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4668 Hi @KurtYoung, do you want to merge this to the `release-1.3` branch as well? > Remove string format in BitSet to improve the performance of > BuildSideOuterjoin > --- > > Key: FLINK-7617 > URL: https://issues.apache.org/jira/browse/FLINK-7617 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Jingsong Lee >Assignee: Jingsong Lee > > When using BuildSideOuterjoin, will frequently call Bitset.set and get, there > will be > Preconditions.checkArgument (index = 0, > String.format ("Input Index [% d] is larger than BitSet available size [% > d].", Index, bitLength)); > Of the check, String.format will lead to a sharp decline performance. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4668: [FLINK-7617] Remove string format in BitSet to improve th...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4668 Hi @KurtYoung, do you want to merge this to the `release-1.3` branch as well? ---
[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
[ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166365#comment-16166365 ] ASF GitHub Bot commented on FLINK-7567: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4655 You can try and run the build locally to have a look at the file generated by the compatibility plugin. Then we can figure out why it's complaining. > DataStream#iterate() on env.fromElements() / env.fromCollection() does not > work > --- > > Key: FLINK-7567 > URL: https://issues.apache.org/jira/browse/FLINK-7567 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2 >Reporter: Peter Ertl >Assignee: Mikhail Lipkovich > > When I try to execute this simple snippet of code > {code} > @Test > def iterateOnElements(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > // do something silly just do get iteration going ... > val result = env.fromElements(1, 2, 3).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) > }) > result.print() > env.execute() > } > {code} > I get the following exception: > {code} > java.lang.UnsupportedOperationException: Parallelism of the feedback stream > must match the parallelism of the original stream. Parallelism of original > stream: 1; parallelism of feedback stream: 8 > at > org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87) > at > org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77) > at > org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519) > at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > Since is just the simplest iterating stream setup I could imagine this error > makes no sense to me :-P -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4655: [FLINK-7567]: Removed keepPartitioning parameter from ite...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4655 You can try and run the build locally to have a look at the file generated by the compatibility plugin. Then we can figure out why it's complaining. ---
[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/3511#discussion_r138905728 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java --- @@ -218,7 +221,7 @@ public long addRecord(T record) throws IOException { return offset; } - public MutableObjectIterator finishWriteAndSortKeys(List memory) throws IOException { + public MutableObjectIterator finishWriteAndSortKeys(List memory) throws IOException, IllegalAccessException, TemplateException, InstantiationException, CompileException, InvocationTargetException, NoSuchMethodException, ClassNotFoundException { --- End diff -- Or, maybe even better to catch all of these in the sorter factory method (`SorterFactory.createSorter`), and create a non-codegen sorter when any of these exceptions happen (and log a warning). ---
[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/3511#discussion_r138904204 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java --- @@ -115,6 +115,13 @@ public Configuration getConfiguration() { return tmpDirectories; } + public String getFirstTmpDirectory(){ --- End diff -- We can remove this method, since it became unused when we changed to generating the code into just a string instead of a temporary file. ---
[jira] [Assigned] (FLINK-7617) Remove string format in BitSet to improve the performance of BuildSideOuterjoin
[ https://issues.apache.org/jira/browse/FLINK-7617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-7617: --- Assignee: Jingsong Lee > Remove string format in BitSet to improve the performance of > BuildSideOuterjoin > --- > > Key: FLINK-7617 > URL: https://issues.apache.org/jira/browse/FLINK-7617 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Jingsong Lee >Assignee: Jingsong Lee > > When using BuildSideOuterjoin, will frequently call Bitset.set and get, there > will be > Preconditions.checkArgument (index = 0, > String.format ("Input Index [% d] is larger than BitSet available size [% > d].", Index, bitLength)); > Of the check, String.format will lead to a sharp decline performance. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7607) Web Frontend Hangs with Large Numbers of Tasks
[ https://issues.apache.org/jira/browse/FLINK-7607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7607: Fix Version/s: 1.4.0 > Web Frontend Hangs with Large Numbers of Tasks > -- > > Key: FLINK-7607 > URL: https://issues.apache.org/jira/browse/FLINK-7607 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.3.2 > Environment: Attempted to load the web frontend on a MacBook Pro 15" > (late 2016) with 16 GB of memory using both Chrome 60.0 and Safari 10.1.2. >Reporter: Joshua Griffith > Labels: performance > Fix For: 1.4.0 > > > Viewing a job with a high number of tasks in the web front-end causes the > page to hang, consuming 100% CPU on a core. At 200 tasks the page slows > noticeably and scrolling results in long, non-responsive pauses. At 400 tasks > the page only updates once per minute and is almost entirely non-responsive. > Initially, I thought this was caused by rendering a complex job graph but > opening the inspector and deleting the canvas did not improve page > performance. Further inspection indicated that the page was redrawing every > DOM element in the task list on every update. > A possible solution is to use an approach similar to > [react-list|https://github.com/orgsync/react-list] and only request > data/render list items that are in view and only update DOM nodes that have > changed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166340#comment-16166340 ] Aljoscha Krettek commented on FLINK-7606: - Will NFAs not be cleared when the allowed "within" time is over. If this were not the case and if NFA state is in fact never cleaned up than this is a leak and will lead to problems. > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7606: Summary: CEP operator leaks state (was: Memory leak on NestedMapsStateTable) > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/3511#discussion_r138899420 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java --- @@ -218,7 +221,7 @@ public long addRecord(T record) throws IOException { return offset; } - public MutableObjectIterator finishWriteAndSortKeys(List memory) throws IOException { + public MutableObjectIterator finishWriteAndSortKeys(List memory) throws IOException, IllegalAccessException, TemplateException, InstantiationException, CompileException, InvocationTargetException, NoSuchMethodException, ClassNotFoundException { --- End diff -- I'm thinking that we should encapsulate all these exceptions into one `SorterCodegenException` (or just `CodegenException`) and throw only that from the sorter factory method, to avoid having to declare this litany of exceptions (this same list occurs at multiple places). ---
[jira] [Updated] (FLINK-7590) Flink failed to flush and close the file system output stream for checkpointing because of s3 read timeout
[ https://issues.apache.org/jira/browse/FLINK-7590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7590: Priority: Major (was: Blocker) > Flink failed to flush and close the file system output stream for > checkpointing because of s3 read timeout > -- > > Key: FLINK-7590 > URL: https://issues.apache.org/jira/browse/FLINK-7590 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Flink job failed once over the weekend because of the following issue. It > picked itself up afterwards and has been running well. But the issue might > worth taking a look at. > {code:java} > 2017-09-03 13:18:38,998 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- reduce > (14/18) (c97256badc87e995d456e7a13cec5de9) switched from RUNNING to FAILED. > AsynchronousException{java.lang.Exception: Could not materialize checkpoint > 163 for operator reduce (14/18).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 163 for > operator reduce (14/18). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa in order to obtain the > stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: > java.io.IOException: Could not flush and close the file system output stream > to s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa in order to obtain > the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 7 more > Caused by: java.io.IOException: Could not flush and close the file > system output stream to s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeSnapshotStreamAndGetHandle(RocksDBKeyedStateBackend.java:693) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeCheckpointStream(RocksDBKeyedStateBackend.java:531) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:420) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:399) > at > org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at >
[jira] [Commented] (FLINK-7590) Flink failed to flush and close the file system output stream for checkpointing because of s3 read timeout
[ https://issues.apache.org/jira/browse/FLINK-7590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166333#comment-16166333 ] Aljoscha Krettek commented on FLINK-7590: - Agreed! > Flink failed to flush and close the file system output stream for > checkpointing because of s3 read timeout > -- > > Key: FLINK-7590 > URL: https://issues.apache.org/jira/browse/FLINK-7590 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Bowen Li >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Flink job failed once over the weekend because of the following issue. It > picked itself up afterwards and has been running well. But the issue might > worth taking a look at. > {code:java} > 2017-09-03 13:18:38,998 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- reduce > (14/18) (c97256badc87e995d456e7a13cec5de9) switched from RUNNING to FAILED. > AsynchronousException{java.lang.Exception: Could not materialize checkpoint > 163 for operator reduce (14/18).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 163 for > operator reduce (14/18). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa in order to obtain the > stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: > java.io.IOException: Could not flush and close the file system output stream > to s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa in order to obtain > the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 7 more > Caused by: java.io.IOException: Could not flush and close the file > system output stream to s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeSnapshotStreamAndGetHandle(RocksDBKeyedStateBackend.java:693) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeCheckpointStream(RocksDBKeyedStateBackend.java:531) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:420) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:399) > at > org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) > at
[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166323#comment-16166323 ] ASF GitHub Bot commented on FLINK-7378: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138879343 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java --- @@ -159,23 +159,20 @@ public void recycle(MemorySegment segment) { this.numTotalRequiredBuffers += numRequiredBuffers; - final List segments = new ArrayList<>(numRequiredBuffers); - for (int i = 0 ; i < numRequiredBuffers ; i++) { - segments.add(availableMemorySegments.poll()); - } + redistributeBuffers(); + } + final List segments = new ArrayList<>(numRequiredBuffers); + for (int i = 0 ; i < numRequiredBuffers ; i++) { try { - redistributeBuffers(); - } catch (IOException e) { - if (segments.size() > 0) { - recycleMemorySegments(segments); - } - + segments.add(availableMemorySegments.take()); --- End diff -- I know, I was the one who suggested it, but thinking about the blocking `take()` a bit more and with some more background I acquired over the last weeks, I'm getting the feeling, we should do the request similar to `LocalBufferPool#requestBuffer()` so that if (for some reason) we are waiting forever, we may at least be stopped by the `destroy()` function being called. Or what do you think? I'm thinking about something like this: ``` final ArrayList segments = new ArrayList<>(numRequiredBuffers); try { while (segments.size() < numRequiredBuffers) { if (isDestroyed) { throw new IllegalStateException("Buffer pool is destroyed."); } final MemorySegment segment = availableMemorySegments.poll(2, TimeUnit.SECONDS); if (segment != null) { segments.add(segment); } } } catch (Throwable e) { recycleMemorySegments(segments); ExceptionUtils.rethrowIOException(e); } ``` (using the same timeout of 2s as in `LocalBufferPool#requestBuffer()`) The following test (for `NetworkBufferPoolTest`) could verify this behaviour: ``` @Rule public ExpectedException expectedException = ExpectedException.none(); /** * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted in * case of a concurrent {@link NetworkBufferPool#destroy()} call. */ @Test public void testRequestMemorySegmentsInterruptable() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); MemorySegment segment = globalPool.requestMemorySegment(); assertNotNull(segment); final OneShotLatch isRunning = new OneShotLatch(); CheckedThread asyncRequest = new CheckedThread() { @Override public void go() throws Exception { isRunning.trigger(); globalPool.requestMemorySegments(10); } }; asyncRequest.start(); // We want the destroy call inside the blocking part of the globalPool.requestMemorySegments() // call above. We cannot guarantee this though but make it highly probable: isRunning.await(); Thread.sleep(10); globalPool.destroy(); segment.free(); expectedException.expect(IllegalStateException.class); expectedException.expectMessage("destroyed"); asyncRequest.sync(); } ``` > Create a fix size (non rebalancing) buffer pool type for the floating buffers > - > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core >
[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166322#comment-16166322 ] ASF GitHub Bot commented on FLINK-7378: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138852758 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java --- @@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws IOException { globalPool.destroy(); } + /** +* Tests the interaction of requesting memory segments and creating local buffer pool and +* verifies the number of assigned buffers match after redistributing buffers because of newly +* requested memory segments or new buffer pools created. +*/ + @Test + public void testUniformDistributionBounded4() throws IOException { + NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP); + + BufferPool first = globalPool.createBufferPool(0, 10); + assertEquals(10, first.getNumBuffers()); + + List segmentList1 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList1.size()); + assertEquals(8, first.getNumBuffers()); + + BufferPool second = globalPool.createBufferPool(0, 10); + assertEquals(4, first.getNumBuffers()); + assertEquals(4, second.getNumBuffers()); + + List segmentList2 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList2.size()); + assertEquals(3, first.getNumBuffers()); + assertEquals(3, second.getNumBuffers()); + + List segmentList3 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList3.size()); + assertEquals(2, first.getNumBuffers()); + assertEquals(2, second.getNumBuffers()); + + String msg = "Did not return all buffers to network buffer pool after test."; + assertEquals(msg, 4, globalPool.getNumberOfAvailableMemorySegments()); --- End diff -- for this test, `msg` is wrong as nothing has been recycled here (yet) > Create a fix size (non rebalancing) buffer pool type for the floating buffers > - > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}}s directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166324#comment-16166324 ] ASF GitHub Bot commented on FLINK-7378: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138891582 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } - // common case to request buffers less than the total capacity of global pool - final int numRequiredBuffers = 8; - segments = globalPool.requestMemorySegments(numRequiredBuffers); - - assertNotNull(segments); - assertEquals(segments.size(), numRequiredBuffers); - - // recycle all the requested buffers to global pool - globalPool.recycleMemorySegments(segments); + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required +* buffers exceeding the capacity of {@link NetworkBufferPool}. +*/ + @Test + public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { + final int numBuffers = 10; - assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - // uncommon case to request buffers exceeding the total capacity of global pool + List memorySegments = Collections.emptyList(); try { - segments = null; - segments = globalPool.requestMemorySegments(11); + memorySegments = globalPool.requestMemorySegments(numBuffers + 1); fail("Should throw an IOException"); } catch (IOException e) { - assertNull(segments); - // recycle all the requested buffers to global pool after exception + assertEquals(memorySegments.size(), 0); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to +* cause exception. +*/ + @Test + public void testRequestMemorySegmentsWithInvalidArgument() throws Exception { + final int numBuffers = 10; + + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + + List memorySegments = Collections.emptyList(); + try { + // the number of requested buffers should be larger than zero + memorySegments = globalPool.requestMemorySegments(0); + fail("Should throw an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals(memorySegments.size(), 0); + assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + } + } + + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently not containing the number of required free
[jira] [Updated] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7608: Priority: Blocker (was: Major) > LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Hai Zhou >Assignee: Hai Zhou >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166327#comment-16166327 ] ASF GitHub Bot commented on FLINK-7378: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138889161 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } - // common case to request buffers less than the total capacity of global pool - final int numRequiredBuffers = 8; - segments = globalPool.requestMemorySegments(numRequiredBuffers); - - assertNotNull(segments); - assertEquals(segments.size(), numRequiredBuffers); - - // recycle all the requested buffers to global pool - globalPool.recycleMemorySegments(segments); + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required +* buffers exceeding the capacity of {@link NetworkBufferPool}. +*/ + @Test + public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { + final int numBuffers = 10; - assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - // uncommon case to request buffers exceeding the total capacity of global pool + List memorySegments = Collections.emptyList(); try { - segments = null; - segments = globalPool.requestMemorySegments(11); + memorySegments = globalPool.requestMemorySegments(numBuffers + 1); fail("Should throw an IOException"); } catch (IOException e) { - assertNull(segments); - // recycle all the requested buffers to global pool after exception + assertEquals(memorySegments.size(), 0); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } --- End diff -- add `finally`with `globalPool.destroy()` > Create a fix size (non rebalancing) buffer pool type for the floating buffers > - > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}}s directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166329#comment-16166329 ] ASF GitHub Bot commented on FLINK-7378: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138853431 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java --- @@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws IOException { globalPool.destroy(); } + /** +* Tests the interaction of requesting memory segments and creating local buffer pool and +* verifies the number of assigned buffers match after redistributing buffers because of newly +* requested memory segments or new buffer pools created. +*/ + @Test + public void testUniformDistributionBounded4() throws IOException { + NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP); + + BufferPool first = globalPool.createBufferPool(0, 10); + assertEquals(10, first.getNumBuffers()); + + List segmentList1 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList1.size()); + assertEquals(8, first.getNumBuffers()); + + BufferPool second = globalPool.createBufferPool(0, 10); + assertEquals(4, first.getNumBuffers()); + assertEquals(4, second.getNumBuffers()); + + List segmentList2 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList2.size()); + assertEquals(3, first.getNumBuffers()); + assertEquals(3, second.getNumBuffers()); + + List segmentList3 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList3.size()); + assertEquals(2, first.getNumBuffers()); + assertEquals(2, second.getNumBuffers()); + + String msg = "Did not return all buffers to network buffer pool after test."; + assertEquals(msg, 4, globalPool.getNumberOfAvailableMemorySegments()); + + globalPool.recycleMemorySegments(segmentList1); + assertEquals(msg, 6, globalPool.getNumberOfAvailableMemorySegments()); + assertEquals(3, first.getNumBuffers()); + assertEquals(3, second.getNumBuffers()); + + globalPool.recycleMemorySegments(segmentList2); + assertEquals(msg, 8, globalPool.getNumberOfAvailableMemorySegments()); + assertEquals(4, first.getNumBuffers()); + assertEquals(4, second.getNumBuffers()); + + globalPool.recycleMemorySegments(segmentList3); + assertEquals(msg, 10, globalPool.getNumberOfAvailableMemorySegments()); + assertEquals(5, first.getNumBuffers()); + assertEquals(5, second.getNumBuffers()); + + globalPool.destroy(); --- End diff -- you also need to call `NetworkBufferPool#destroyAllBufferPools()` or `LocalBufferPool#lazyDestroy()` for `first` and `second` to properly release their buffers > Create a fix size (non rebalancing) buffer pool type for the floating buffers > - > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}}s directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166328#comment-16166328 ] ASF GitHub Bot commented on FLINK-7378: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r13514 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } - // common case to request buffers less than the total capacity of global pool - final int numRequiredBuffers = 8; - segments = globalPool.requestMemorySegments(numRequiredBuffers); - - assertNotNull(segments); - assertEquals(segments.size(), numRequiredBuffers); - - // recycle all the requested buffers to global pool - globalPool.recycleMemorySegments(segments); + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required +* buffers exceeding the capacity of {@link NetworkBufferPool}. +*/ + @Test + public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { + final int numBuffers = 10; - assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - // uncommon case to request buffers exceeding the total capacity of global pool + List memorySegments = Collections.emptyList(); try { - segments = null; - segments = globalPool.requestMemorySegments(11); + memorySegments = globalPool.requestMemorySegments(numBuffers + 1); fail("Should throw an IOException"); } catch (IOException e) { - assertNull(segments); - // recycle all the requested buffers to global pool after exception + assertEquals(memorySegments.size(), 0); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to +* cause exception. +*/ + @Test + public void testRequestMemorySegmentsWithInvalidArgument() throws Exception { + final int numBuffers = 10; + + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + + List memorySegments = Collections.emptyList(); + try { + // the number of requested buffers should be larger than zero + memorySegments = globalPool.requestMemorySegments(0); + fail("Should throw an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals(memorySegments.size(), 0); --- End diff -- unnecessary check - see above > Create a fix size (non rebalancing) buffer pool type for the floating buffers > - > > Key: FLINK-7378 > URL:
[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166325#comment-16166325 ] ASF GitHub Bot commented on FLINK-7378: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r13882 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); --- End diff -- here, you should also destroy the `globalPool`, i.e. call `globalPool.destroy()` > Create a fix size (non rebalancing) buffer pool type for the floating buffers > - > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}}s directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166331#comment-16166331 ] ASF GitHub Bot commented on FLINK-7378: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138897087 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java --- @@ -333,7 +333,7 @@ public void updateInputChannel(InputChannelDeploymentDescriptor icdd) throws IOE InputChannel current = inputChannels.get(partitionId); - if (current.getClass() == UnknownInputChannel.class) { + if (current instanceof UnknownInputChannel) { --- End diff -- Just to be on the safe side, you should also change this check in `#setInputChannel()` above. This way, we handle all sub-classes of `UnknownInputChannel` the same way as `UnknownInputChannel` itself > Create a fix size (non rebalancing) buffer pool type for the floating buffers > - > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}}s directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166330#comment-16166330 ] ASF GitHub Bot commented on FLINK-7378: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138889183 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } - // common case to request buffers less than the total capacity of global pool - final int numRequiredBuffers = 8; - segments = globalPool.requestMemorySegments(numRequiredBuffers); - - assertNotNull(segments); - assertEquals(segments.size(), numRequiredBuffers); - - // recycle all the requested buffers to global pool - globalPool.recycleMemorySegments(segments); + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required +* buffers exceeding the capacity of {@link NetworkBufferPool}. +*/ + @Test + public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { + final int numBuffers = 10; - assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - // uncommon case to request buffers exceeding the total capacity of global pool + List memorySegments = Collections.emptyList(); try { - segments = null; - segments = globalPool.requestMemorySegments(11); + memorySegments = globalPool.requestMemorySegments(numBuffers + 1); fail("Should throw an IOException"); } catch (IOException e) { - assertNull(segments); - // recycle all the requested buffers to global pool after exception + assertEquals(memorySegments.size(), 0); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to +* cause exception. +*/ + @Test + public void testRequestMemorySegmentsWithInvalidArgument() throws Exception { + final int numBuffers = 10; + + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + + List memorySegments = Collections.emptyList(); + try { + // the number of requested buffers should be larger than zero + memorySegments = globalPool.requestMemorySegments(0); + fail("Should throw an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals(memorySegments.size(), 0); + assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + } --- End diff -- add `finally`with `globalPool.destroy()` > Create a fix size (non rebalancing) buffer pool type for the floating buffers >
[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166326#comment-16166326 ] ASF GitHub Bot commented on FLINK-7378: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r13440 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } - // common case to request buffers less than the total capacity of global pool - final int numRequiredBuffers = 8; - segments = globalPool.requestMemorySegments(numRequiredBuffers); - - assertNotNull(segments); - assertEquals(segments.size(), numRequiredBuffers); - - // recycle all the requested buffers to global pool - globalPool.recycleMemorySegments(segments); + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required +* buffers exceeding the capacity of {@link NetworkBufferPool}. +*/ + @Test + public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { + final int numBuffers = 10; - assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - // uncommon case to request buffers exceeding the total capacity of global pool + List memorySegments = Collections.emptyList(); try { - segments = null; - segments = globalPool.requestMemorySegments(11); + memorySegments = globalPool.requestMemorySegments(numBuffers + 1); fail("Should throw an IOException"); } catch (IOException e) { - assertNull(segments); - // recycle all the requested buffers to global pool after exception + assertEquals(memorySegments.size(), 0); --- End diff -- If there was an exception, `memorySegments` will _always_ be the `Collections.emptyList()` you set before, so there's no need to check for its size. > Create a fix size (non rebalancing) buffer pool type for the floating buffers > - > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}}s directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138891582 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } - // common case to request buffers less than the total capacity of global pool - final int numRequiredBuffers = 8; - segments = globalPool.requestMemorySegments(numRequiredBuffers); - - assertNotNull(segments); - assertEquals(segments.size(), numRequiredBuffers); - - // recycle all the requested buffers to global pool - globalPool.recycleMemorySegments(segments); + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required +* buffers exceeding the capacity of {@link NetworkBufferPool}. +*/ + @Test + public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { + final int numBuffers = 10; - assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - // uncommon case to request buffers exceeding the total capacity of global pool + List memorySegments = Collections.emptyList(); try { - segments = null; - segments = globalPool.requestMemorySegments(11); + memorySegments = globalPool.requestMemorySegments(numBuffers + 1); fail("Should throw an IOException"); } catch (IOException e) { - assertNull(segments); - // recycle all the requested buffers to global pool after exception + assertEquals(memorySegments.size(), 0); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to +* cause exception. +*/ + @Test + public void testRequestMemorySegmentsWithInvalidArgument() throws Exception { + final int numBuffers = 10; + + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + + List memorySegments = Collections.emptyList(); + try { + // the number of requested buffers should be larger than zero + memorySegments = globalPool.requestMemorySegments(0); + fail("Should throw an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals(memorySegments.size(), 0); + assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + } + } + + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently not containing the number of required free segments (currently occupied by a buffer pool). +*/ + @Test + public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, InterruptedException { + final int numBuffers = 10; + +
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138889161 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } - // common case to request buffers less than the total capacity of global pool - final int numRequiredBuffers = 8; - segments = globalPool.requestMemorySegments(numRequiredBuffers); - - assertNotNull(segments); - assertEquals(segments.size(), numRequiredBuffers); - - // recycle all the requested buffers to global pool - globalPool.recycleMemorySegments(segments); + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required +* buffers exceeding the capacity of {@link NetworkBufferPool}. +*/ + @Test + public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { + final int numBuffers = 10; - assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - // uncommon case to request buffers exceeding the total capacity of global pool + List memorySegments = Collections.emptyList(); try { - segments = null; - segments = globalPool.requestMemorySegments(11); + memorySegments = globalPool.requestMemorySegments(numBuffers + 1); fail("Should throw an IOException"); } catch (IOException e) { - assertNull(segments); - // recycle all the requested buffers to global pool after exception + assertEquals(memorySegments.size(), 0); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } --- End diff -- add `finally`with `globalPool.destroy()` ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r13882 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); --- End diff -- here, you should also destroy the `globalPool`, i.e. call `globalPool.destroy()` ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138879343 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java --- @@ -159,23 +159,20 @@ public void recycle(MemorySegment segment) { this.numTotalRequiredBuffers += numRequiredBuffers; - final List segments = new ArrayList<>(numRequiredBuffers); - for (int i = 0 ; i < numRequiredBuffers ; i++) { - segments.add(availableMemorySegments.poll()); - } + redistributeBuffers(); + } + final List segments = new ArrayList<>(numRequiredBuffers); + for (int i = 0 ; i < numRequiredBuffers ; i++) { try { - redistributeBuffers(); - } catch (IOException e) { - if (segments.size() > 0) { - recycleMemorySegments(segments); - } - + segments.add(availableMemorySegments.take()); --- End diff -- I know, I was the one who suggested it, but thinking about the blocking `take()` a bit more and with some more background I acquired over the last weeks, I'm getting the feeling, we should do the request similar to `LocalBufferPool#requestBuffer()` so that if (for some reason) we are waiting forever, we may at least be stopped by the `destroy()` function being called. Or what do you think? I'm thinking about something like this: ``` final ArrayList segments = new ArrayList<>(numRequiredBuffers); try { while (segments.size() < numRequiredBuffers) { if (isDestroyed) { throw new IllegalStateException("Buffer pool is destroyed."); } final MemorySegment segment = availableMemorySegments.poll(2, TimeUnit.SECONDS); if (segment != null) { segments.add(segment); } } } catch (Throwable e) { recycleMemorySegments(segments); ExceptionUtils.rethrowIOException(e); } ``` (using the same timeout of 2s as in `LocalBufferPool#requestBuffer()`) The following test (for `NetworkBufferPoolTest`) could verify this behaviour: ``` @Rule public ExpectedException expectedException = ExpectedException.none(); /** * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted in * case of a concurrent {@link NetworkBufferPool#destroy()} call. */ @Test public void testRequestMemorySegmentsInterruptable() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); MemorySegment segment = globalPool.requestMemorySegment(); assertNotNull(segment); final OneShotLatch isRunning = new OneShotLatch(); CheckedThread asyncRequest = new CheckedThread() { @Override public void go() throws Exception { isRunning.trigger(); globalPool.requestMemorySegments(10); } }; asyncRequest.start(); // We want the destroy call inside the blocking part of the globalPool.requestMemorySegments() // call above. We cannot guarantee this though but make it highly probable: isRunning.await(); Thread.sleep(10); globalPool.destroy(); segment.free(); expectedException.expect(IllegalStateException.class); expectedException.expectMessage("destroyed"); asyncRequest.sync(); } ``` ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r13514 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } - // common case to request buffers less than the total capacity of global pool - final int numRequiredBuffers = 8; - segments = globalPool.requestMemorySegments(numRequiredBuffers); - - assertNotNull(segments); - assertEquals(segments.size(), numRequiredBuffers); - - // recycle all the requested buffers to global pool - globalPool.recycleMemorySegments(segments); + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required +* buffers exceeding the capacity of {@link NetworkBufferPool}. +*/ + @Test + public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { + final int numBuffers = 10; - assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - // uncommon case to request buffers exceeding the total capacity of global pool + List memorySegments = Collections.emptyList(); try { - segments = null; - segments = globalPool.requestMemorySegments(11); + memorySegments = globalPool.requestMemorySegments(numBuffers + 1); fail("Should throw an IOException"); } catch (IOException e) { - assertNull(segments); - // recycle all the requested buffers to global pool after exception + assertEquals(memorySegments.size(), 0); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to +* cause exception. +*/ + @Test + public void testRequestMemorySegmentsWithInvalidArgument() throws Exception { + final int numBuffers = 10; + + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + + List memorySegments = Collections.emptyList(); + try { + // the number of requested buffers should be larger than zero + memorySegments = globalPool.requestMemorySegments(0); + fail("Should throw an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals(memorySegments.size(), 0); --- End diff -- unnecessary check - see above ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138897087 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java --- @@ -333,7 +333,7 @@ public void updateInputChannel(InputChannelDeploymentDescriptor icdd) throws IOE InputChannel current = inputChannels.get(partitionId); - if (current.getClass() == UnknownInputChannel.class) { + if (current instanceof UnknownInputChannel) { --- End diff -- Just to be on the safe side, you should also change this check in `#setInputChannel()` above. This way, we handle all sub-classes of `UnknownInputChannel` the same way as `UnknownInputChannel` itself ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138889183 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } - // common case to request buffers less than the total capacity of global pool - final int numRequiredBuffers = 8; - segments = globalPool.requestMemorySegments(numRequiredBuffers); - - assertNotNull(segments); - assertEquals(segments.size(), numRequiredBuffers); - - // recycle all the requested buffers to global pool - globalPool.recycleMemorySegments(segments); + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required +* buffers exceeding the capacity of {@link NetworkBufferPool}. +*/ + @Test + public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { + final int numBuffers = 10; - assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - // uncommon case to request buffers exceeding the total capacity of global pool + List memorySegments = Collections.emptyList(); try { - segments = null; - segments = globalPool.requestMemorySegments(11); + memorySegments = globalPool.requestMemorySegments(numBuffers + 1); fail("Should throw an IOException"); } catch (IOException e) { - assertNull(segments); - // recycle all the requested buffers to global pool after exception + assertEquals(memorySegments.size(), 0); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to +* cause exception. +*/ + @Test + public void testRequestMemorySegmentsWithInvalidArgument() throws Exception { + final int numBuffers = 10; + + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + + List memorySegments = Collections.emptyList(); + try { + // the number of requested buffers should be larger than zero + memorySegments = globalPool.requestMemorySegments(0); + fail("Should throw an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals(memorySegments.size(), 0); + assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + } --- End diff -- add `finally`with `globalPool.destroy()` ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138852758 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java --- @@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws IOException { globalPool.destroy(); } + /** +* Tests the interaction of requesting memory segments and creating local buffer pool and +* verifies the number of assigned buffers match after redistributing buffers because of newly +* requested memory segments or new buffer pools created. +*/ + @Test + public void testUniformDistributionBounded4() throws IOException { + NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP); + + BufferPool first = globalPool.createBufferPool(0, 10); + assertEquals(10, first.getNumBuffers()); + + List segmentList1 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList1.size()); + assertEquals(8, first.getNumBuffers()); + + BufferPool second = globalPool.createBufferPool(0, 10); + assertEquals(4, first.getNumBuffers()); + assertEquals(4, second.getNumBuffers()); + + List segmentList2 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList2.size()); + assertEquals(3, first.getNumBuffers()); + assertEquals(3, second.getNumBuffers()); + + List segmentList3 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList3.size()); + assertEquals(2, first.getNumBuffers()); + assertEquals(2, second.getNumBuffers()); + + String msg = "Did not return all buffers to network buffer pool after test."; + assertEquals(msg, 4, globalPool.getNumberOfAvailableMemorySegments()); --- End diff -- for this test, `msg` is wrong as nothing has been recycled here (yet) ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r13440 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} +* currently containing the number of required free segments. +*/ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List segments = null; - // request buffers from global pool with illegal argument + List memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } - // common case to request buffers less than the total capacity of global pool - final int numRequiredBuffers = 8; - segments = globalPool.requestMemorySegments(numRequiredBuffers); - - assertNotNull(segments); - assertEquals(segments.size(), numRequiredBuffers); - - // recycle all the requested buffers to global pool - globalPool.recycleMemorySegments(segments); + /** +* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required +* buffers exceeding the capacity of {@link NetworkBufferPool}. +*/ + @Test + public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { + final int numBuffers = 10; - assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - // uncommon case to request buffers exceeding the total capacity of global pool + List memorySegments = Collections.emptyList(); try { - segments = null; - segments = globalPool.requestMemorySegments(11); + memorySegments = globalPool.requestMemorySegments(numBuffers + 1); fail("Should throw an IOException"); } catch (IOException e) { - assertNull(segments); - // recycle all the requested buffers to global pool after exception + assertEquals(memorySegments.size(), 0); --- End diff -- If there was an exception, `memorySegments` will _always_ be the `Collections.emptyList()` you set before, so there's no need to check for its size. ---
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138853431 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java --- @@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws IOException { globalPool.destroy(); } + /** +* Tests the interaction of requesting memory segments and creating local buffer pool and +* verifies the number of assigned buffers match after redistributing buffers because of newly +* requested memory segments or new buffer pools created. +*/ + @Test + public void testUniformDistributionBounded4() throws IOException { + NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP); + + BufferPool first = globalPool.createBufferPool(0, 10); + assertEquals(10, first.getNumBuffers()); + + List segmentList1 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList1.size()); + assertEquals(8, first.getNumBuffers()); + + BufferPool second = globalPool.createBufferPool(0, 10); + assertEquals(4, first.getNumBuffers()); + assertEquals(4, second.getNumBuffers()); + + List segmentList2 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList2.size()); + assertEquals(3, first.getNumBuffers()); + assertEquals(3, second.getNumBuffers()); + + List segmentList3 = globalPool.requestMemorySegments(2); + assertEquals(2, segmentList3.size()); + assertEquals(2, first.getNumBuffers()); + assertEquals(2, second.getNumBuffers()); + + String msg = "Did not return all buffers to network buffer pool after test."; + assertEquals(msg, 4, globalPool.getNumberOfAvailableMemorySegments()); + + globalPool.recycleMemorySegments(segmentList1); + assertEquals(msg, 6, globalPool.getNumberOfAvailableMemorySegments()); + assertEquals(3, first.getNumBuffers()); + assertEquals(3, second.getNumBuffers()); + + globalPool.recycleMemorySegments(segmentList2); + assertEquals(msg, 8, globalPool.getNumberOfAvailableMemorySegments()); + assertEquals(4, first.getNumBuffers()); + assertEquals(4, second.getNumBuffers()); + + globalPool.recycleMemorySegments(segmentList3); + assertEquals(msg, 10, globalPool.getNumberOfAvailableMemorySegments()); + assertEquals(5, first.getNumBuffers()); + assertEquals(5, second.getNumBuffers()); + + globalPool.destroy(); --- End diff -- you also need to call `NetworkBufferPool#destroyAllBufferPools()` or `LocalBufferPool#lazyDestroy()` for `first` and `second` to properly release their buffers ---
[jira] [Created] (FLINK-7626) Add some metric description about checkpoints
Hai Zhou created FLINK-7626: --- Summary: Add some metric description about checkpoints Key: FLINK-7626 URL: https://issues.apache.org/jira/browse/FLINK-7626 Project: Flink Issue Type: Bug Components: Documentation, Metrics Affects Versions: 1.3.2 Reporter: Hai Zhou Assignee: Hai Zhou Fix For: 1.4.0, 1.3.3 Add some metric description in"Debugging & Monitoring / Metrics" part of docs: {noformat} //Number of total checkpoints (in progress, completed, failed) totalNumberOfCheckpoints //Number of in progress checkpoints. numberOfInProgressCheckpoints //Number of successfully completed checkpoints numberOfCompletedCheckpoints //Number of failed checkpoints. numberOfFailedCheckpoints //Timestamp when the checkpoint was restored at the coordinator. lastCheckpointRestoreTimestamp //Buffered bytes during alignment over all subtasks. lastCheckpointAlignmentBuffered {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4658: [Flink-7596][Table API & SQL] fix bug when Set Ope...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4658#discussion_r138894127 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/type/SqlTypeFactoryImpl.java --- @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.sql.type; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataTypeFamily; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.sql.SqlCollation; +import org.apache.calcite.sql.SqlIntervalQualifier; +import org.apache.calcite.util.Glossary; +import org.apache.calcite.util.Util; +import org.apache.flink.table.plan.schema.GenericRelDataType; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * SqlTypeFactoryImpl provides a default implementation of + * {@link RelDataTypeFactory} which supports SQL types. + */ +public class SqlTypeFactoryImpl extends RelDataTypeFactoryImpl { + //~ Constructors --- + + public SqlTypeFactoryImpl(RelDataTypeSystem typeSystem) { + super(typeSystem); + } + + //~ Methods + + // implement RelDataTypeFactory + public RelDataType createSqlType(SqlTypeName typeName) { + if (typeName.allowsPrec()) { + return createSqlType(typeName, typeSystem.getDefaultPrecision(typeName)); + } + assertBasic(typeName); + RelDataType newType = new BasicSqlType(typeSystem, typeName); + return canonize(newType); + } + + // implement RelDataTypeFactory + public RelDataType createSqlType( + SqlTypeName typeName, + int precision) { + final int maxPrecision = typeSystem.getMaxPrecision(typeName); + if (maxPrecision >= 0 && precision > maxPrecision) { + precision = maxPrecision; + } + if (typeName.allowsScale()) { + return createSqlType(typeName, precision, typeName.getDefaultScale()); + } + assertBasic(typeName); + assert (precision >= 0) + || (precision == RelDataType.PRECISION_NOT_SPECIFIED); + RelDataType newType = new BasicSqlType(typeSystem, typeName, precision); + newType = SqlTypeUtil.addCharsetAndCollation(newType, this); + return canonize(newType); + } + + // implement RelDataTypeFactory + public RelDataType createSqlType( + SqlTypeName typeName, + int precision, + int scale) { + assertBasic(typeName); + assert (precision >= 0) + || (precision == RelDataType.PRECISION_NOT_SPECIFIED); + final int maxPrecision = typeSystem.getMaxPrecision(typeName); + if (maxPrecision >= 0 && precision > maxPrecision) { + precision = maxPrecision; + } + RelDataType newType = + new BasicSqlType(typeSystem, typeName, precision, scale); + newType = SqlTypeUtil.addCharsetAndCollation(newType, this); + return canonize(newType); + } + + // implement RelDataTypeFactory + public RelDataType createMultisetType( + RelDataType type, + long maxCardinality) { + assert maxCardinality == -1; + RelDataType newType = new MultisetSqlType(type, false); + return canonize(newType); + } + + public RelDataType createArrayType( + RelDataType elementType, + long maxCardinality) { + assert
[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4665 These are not actually the places where you can count dropped data. I would suggest to add this at the very end of `processElement()` where we also check whether we should side-output late data: https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L406. Also, I think a better metrics name would include "dropped" somehow, because right now it just says late. Something like `numLateElementsDropped`. ---
[jira] [Closed] (FLINK-6331) StreamingOperatorsITCase.testAsyncWaitOperator spuriously failing on Travis
[ https://issues.apache.org/jira/browse/FLINK-6331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-6331. Resolution: Cannot Reproduce Could not reproduce the problem. Will close the issue therefore. If this test failure reappears we should re-open this issue. > StreamingOperatorsITCase.testAsyncWaitOperator spuriously failing on Travis > --- > > Key: FLINK-6331 > URL: https://issues.apache.org/jira/browse/FLINK-6331 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: test-stability > > The {{StreamingOperatorsITCase.testAsyncWaitOperator}} fails sometimes on > Travis. > [1] https://s3.amazonaws.com/archive.travis-ci.org/jobs/223541348/log.txt -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5205) Asynchronous remove job messages can interfere with recovered or resubmitted jobs
[ https://issues.apache.org/jira/browse/FLINK-5205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-5205. Resolution: Won't Fix Seems not to be a problem and with the Flip-6 changes obsolete. > Asynchronous remove job messages can interfere with recovered or resubmitted > jobs > - > > Key: FLINK-5205 > URL: https://issues.apache.org/jira/browse/FLINK-5205 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > The {{JobManager}} executes the {{removeJob}} call at various places > asynchronously via the {{RemoveJob}} message, even though this is not > strictly needed. This is not necessary and can be done synchronously. The > advantage would be that the remove job operation won't interfere with > recovered or resubmitted jobs which have the same {{JobID}} as the job for > which the message was initially triggered. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-2733) ZooKeeperLeaderElectionTest.testZooKeeperReelection fails
[ https://issues.apache.org/jira/browse/FLINK-2733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-2733. Resolution: Not A Problem > ZooKeeperLeaderElectionTest.testZooKeeperReelection fails > - > > Key: FLINK-2733 > URL: https://issues.apache.org/jira/browse/FLINK-2733 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 0.10.0 >Reporter: Robert Metzger >Assignee: Till Rohrmann > Labels: test-stability > > I observed a test failure in this run: > https://travis-ci.org/rmetzger/flink/jobs/81571914 > {code} > testZooKeeperReelection(org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest) > Time elapsed: 109.794 sec <<< FAILURE! > java.lang.AssertionError: expected: but > was: > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testZooKeeperReelection(ZooKeeperLeaderElectionTest.java:171) > Results : > Failed tests: > ZooKeeperLeaderElectionTest.testZooKeeperReelection:171 > expected: but was: > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-2733) ZooKeeperLeaderElectionTest.testZooKeeperReelection fails
[ https://issues.apache.org/jira/browse/FLINK-2733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166301#comment-16166301 ] Till Rohrmann commented on FLINK-2733: -- I think [~tonycox], you actually did not use the fixed code when observing the test failure. Will close the issue therefore. > ZooKeeperLeaderElectionTest.testZooKeeperReelection fails > - > > Key: FLINK-2733 > URL: https://issues.apache.org/jira/browse/FLINK-2733 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 0.10.0 >Reporter: Robert Metzger >Assignee: Till Rohrmann > Labels: test-stability > > I observed a test failure in this run: > https://travis-ci.org/rmetzger/flink/jobs/81571914 > {code} > testZooKeeperReelection(org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest) > Time elapsed: 109.794 sec <<< FAILURE! > java.lang.AssertionError: expected: but > was: > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testZooKeeperReelection(ZooKeeperLeaderElectionTest.java:171) > Results : > Failed tests: > ZooKeeperLeaderElectionTest.testZooKeeperReelection:171 > expected: but was: > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4672: [Flink-7621][TableAPI & SQL] Fix Inconsistency of CaseSen...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4672 Thanks @Xpray. The changes look good, but I think if we would really want to have the correct behavior, we need to make more changes at different positions. Could you add some more tests to check insensitive behavior for tables, functions, fields in different programs? ---
[jira] [Assigned] (FLINK-2253) ALS fails if the data flow is split up and the input is non-deterministic
[ https://issues.apache.org/jira/browse/FLINK-2253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-2253: Assignee: (was: Till Rohrmann) > ALS fails if the data flow is split up and the input is non-deterministic > - > > Key: FLINK-2253 > URL: https://issues.apache.org/jira/browse/FLINK-2253 > Project: Flink > Issue Type: Bug > Components: Machine Learning Library >Reporter: Till Rohrmann > Labels: ML > > If the {{temporaryPath}} of Flink's {{ALS}} implementation is set, then the > ALS job is executed one part after the other. This also includes the routing > information calculation for the user and item blocks. If the input > {{DataSet}} is calculated non-deterministically, e.g. if it's the result of a > {{first}} operation, then the resulting routing information won't be > consistent and ALS fails. > A solution would be to pin the input {{DataSet}} so that it will only be > executed once. Until we have this functionality, I propose to simply execute > the user and item routing information at the same time. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7618) Add BINARY supported in FlinkTypeFactory
[ https://issues.apache.org/jira/browse/FLINK-7618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166294#comment-16166294 ] ASF GitHub Bot commented on FLINK-7618: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4669 Thanks @sunjincheng121 but we cannot merge this without tests. Adding a new type should have some tests esp. because you have to add this type at different positions. E.g. `FlinkRelNode` and in the code generator as well. > Add BINARY supported in FlinkTypeFactory > > > Key: FLINK-7618 > URL: https://issues.apache.org/jira/browse/FLINK-7618 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > We will get the following exception when we deal with the BINARY type. > {code} > org.apache.flink.table.api.TableException: Type is not supported: BINARY > at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) > at > org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:377) > at > org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:741) > at > org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:104) > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754) > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4669: [FLINK-7618][table] Add BINARY supported in FlinkTypeFact...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4669 Thanks @sunjincheng121 but we cannot merge this without tests. Adding a new type should have some tests esp. because you have to add this type at different positions. E.g. `FlinkRelNode` and in the code generator as well. ---
[jira] [Commented] (FLINK-7617) Remove string format in BitSet to improve the performance of BuildSideOuterjoin
[ https://issues.apache.org/jira/browse/FLINK-7617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166291#comment-16166291 ] ASF GitHub Bot commented on FLINK-7617: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4668 > Remove string format in BitSet to improve the performance of > BuildSideOuterjoin > --- > > Key: FLINK-7617 > URL: https://issues.apache.org/jira/browse/FLINK-7617 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Jingsong Lee > > When using BuildSideOuterjoin, will frequently call Bitset.set and get, there > will be > Preconditions.checkArgument (index = 0, > String.format ("Input Index [% d] is larger than BitSet available size [% > d].", Index, bitLength)); > Of the check, String.format will lead to a sharp decline performance. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4668: [FLINK-7617] Remove string format in BitSet to imp...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4668 ---
[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166289#comment-16166289 ] ASF GitHub Bot commented on FLINK-7491: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r138889015 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala --- @@ -211,6 +218,14 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp canonize(relType) } + override def createMultisetType(elementType: RelDataType, maxCardinality: Long): RelDataType = { +val relType = new MultisetRelDataType( --- End diff -- There are multiple location where a new type has to be added like `FlinkRelNode`. > Support COLLECT Aggregate function in Flink SQL > --- > > Key: FLINK-7491 > URL: https://issues.apache.org/jira/browse/FLINK-7491 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166288#comment-16166288 ] ASF GitHub Bot commented on FLINK-7491: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r13586 --- Diff: flink-core/pom.xml --- @@ -80,6 +80,13 @@ under the License. + + + org.apache.commons --- End diff -- We should not add additional dependencies to Flink just because of a new data type. There is also no reason behind choosing this library. Couldn't we not just use a usual Java Map? Otherwise I would propose to add class for our own type like we did it for `org.apache.flink.types.Row`. Calcite is using `List`, which is not very nice, but would also work. > Support COLLECT Aggregate function in Flink SQL > --- > > Key: FLINK-7491 > URL: https://issues.apache.org/jira/browse/FLINK-7491 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4585#discussion_r13586 --- Diff: flink-core/pom.xml --- @@ -80,6 +80,13 @@ under the License. + + + org.apache.commons --- End diff -- We should not add additional dependencies to Flink just because of a new data type. There is also no reason behind choosing this library. Couldn't we not just use a usual Java Map? Otherwise I would propose to add class for our own type like we did it for `org.apache.flink.types.Row`. Calcite is using `List`, which is not very nice, but would also work. ---