[jira] [Commented] (FLINK-7386) Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167365#comment-16167365
 ] 

ASF GitHub Bot commented on FLINK-7386:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4675
  
@rmetzger @tzulitai I create this PR and try to fix 
[https://issues.apache.org/jira/browse/FLINK-7386(https://issues.apache.org/jira/browse/FLINK-7386).
 In the PR I do some things as follows:

1. Add `createRequestIndex` method in `ElasticsearchApiCallBridge`, then we 
can use this method to create different `RequestIndexer` instance.

2. Add flink-connector-elasticsearch5.3 project and create 
`BulkProcessorIndexer`. `ActionRequest` will be converted to `DocWriteRequest` 
in the `BulkProcessorIndexer` in the project.

What do you think? Please have a look when you're free, thanks


> Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ 
> client
> 
>
> Key: FLINK-7386
> URL: https://issues.apache.org/jira/browse/FLINK-7386
> Project: Flink
>  Issue Type: Improvement
>  Components: ElasticSearch Connector
>Reporter: Dawid Wysakowicz
>Assignee: Fang Yong
>
> In Elasticsearch 5.2.0 client the class {{BulkProcessor}} was refactored and 
> has no longer the method {{add(ActionRequest)}}.
> For more info see: https://github.com/elastic/elasticsearch/pull/20109



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not compati...

2017-09-14 Thread zjureel
Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4675
  
@rmetzger @tzulitai I create this PR and try to fix 
[https://issues.apache.org/jira/browse/FLINK-7386(https://issues.apache.org/jira/browse/FLINK-7386).
 In the PR I do some things as follows:

1. Add `createRequestIndex` method in `ElasticsearchApiCallBridge`, then we 
can use this method to create different `RequestIndexer` instance.

2. Add flink-connector-elasticsearch5.3 project and create 
`BulkProcessorIndexer`. `ActionRequest` will be converted to `DocWriteRequest` 
in the `BulkProcessorIndexer` in the project.

What do you think? Please have a look when you're free, thanks


---


[jira] [Commented] (FLINK-7386) Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167357#comment-16167357
 ] 

ASF GitHub Bot commented on FLINK-7386:
---

GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/4675

[FLINK-7386] FIx Elasticsearch 5 connector is not compatible with 
Elasticsearch 5.2+ client

## What is the purpose of the change

Add flink-connector-elasticsearch5.3 to support Elasticsearch 5.3 and later 
version

## Brief change log
  - *Add createRequestIndex method in ElasticsearchApiCallBridge*
  - *Add flink-connector-elasticsearch5.3 project*
  - *Add BulkProcessorIndexer in connector-elasticsearch5.3 to convert 
ActionRequest to DocWriteRequest*

## Verifying this change
This change added tests and can be verified as follows:
  - *Add ElasticsearchSinkITCase test case*
  - *Add ELasticsearchSinkExample in connector-elasticsearch5.3 to send 
request to Elasticsearch 5.3 and later versions*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-7386

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4675.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4675


commit 9dd30ccb5f5bd9940c8f1cfea4ffeb256d564862
Author: zjureel 
Date:   2017-09-15T03:51:35Z

[FLINK-7386] Change ElasticsearchApiCallBridge to abstract class and add 
createRequestIndex method

commit 5936bc7734557f75dc4b0c06cfc31b0b0e49a91a
Author: zjureel 
Date:   2017-09-15T03:55:16Z

[FLINK-7386] add flink-connector-elasticsearch5.3 for elasticsearch5.3 and 
later versions

commit dbc87bb1dd361a2e840bd382814a780aa96a45c2
Author: zjureel 
Date:   2017-09-15T04:42:44Z

[FLINK-7386] add test case for ES53

commit 74972ab9798e4e9173ad4cc4a6ec6bdf1390f98a
Author: zjureel 
Date:   2017-09-15T05:33:43Z

[FLINK-7386] add document for ES5.3




> Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ 
> client
> 
>
> Key: FLINK-7386
> URL: https://issues.apache.org/jira/browse/FLINK-7386
> Project: Flink
>  Issue Type: Improvement
>  Components: ElasticSearch Connector
>Reporter: Dawid Wysakowicz
>Assignee: Fang Yong
>
> In Elasticsearch 5.2.0 client the class {{BulkProcessor}} was refactored and 
> has no longer the method {{add(ActionRequest)}}.
> For more info see: https://github.com/elastic/elasticsearch/pull/20109



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not ...

2017-09-14 Thread zjureel
GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/4675

[FLINK-7386] FIx Elasticsearch 5 connector is not compatible with 
Elasticsearch 5.2+ client

## What is the purpose of the change

Add flink-connector-elasticsearch5.3 to support Elasticsearch 5.3 and later 
version

## Brief change log
  - *Add createRequestIndex method in ElasticsearchApiCallBridge*
  - *Add flink-connector-elasticsearch5.3 project*
  - *Add BulkProcessorIndexer in connector-elasticsearch5.3 to convert 
ActionRequest to DocWriteRequest*

## Verifying this change
This change added tests and can be verified as follows:
  - *Add ElasticsearchSinkITCase test case*
  - *Add ELasticsearchSinkExample in connector-elasticsearch5.3 to send 
request to Elasticsearch 5.3 and later versions*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-7386

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4675.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4675


commit 9dd30ccb5f5bd9940c8f1cfea4ffeb256d564862
Author: zjureel 
Date:   2017-09-15T03:51:35Z

[FLINK-7386] Change ElasticsearchApiCallBridge to abstract class and add 
createRequestIndex method

commit 5936bc7734557f75dc4b0c06cfc31b0b0e49a91a
Author: zjureel 
Date:   2017-09-15T03:55:16Z

[FLINK-7386] add flink-connector-elasticsearch5.3 for elasticsearch5.3 and 
later versions

commit dbc87bb1dd361a2e840bd382814a780aa96a45c2
Author: zjureel 
Date:   2017-09-15T04:42:44Z

[FLINK-7386] add test case for ES53

commit 74972ab9798e4e9173ad4cc4a6ec6bdf1390f98a
Author: zjureel 
Date:   2017-09-15T05:33:43Z

[FLINK-7386] add document for ES5.3




---


[jira] [Commented] (FLINK-7627) SingleElementIterable should implement with Serializable

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167317#comment-16167317
 ] 

ASF GitHub Bot commented on FLINK-7627:
---

GitHub user hequn8128 opened a pull request:

https://github.com/apache/flink/pull/4674

[FLINK-7627] [table] SingleElementIterable should implement with 
Serializable


## What is the purpose of the change

*This pull request is a bugfix which implements `SingleElementIterable` 
with `Serializable`*


## Brief change log

  - *Implement SingleElementIterable with Serializable*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added test that validates that `SingleElementIterable` can be 
serialized, otherwise exceptions will be throwed*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no )

## Documentation

  - Does this pull request introduce a new feature? (no)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hequn8128/flink 7627

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4674.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4674


commit b1af2d612265f0727048388fea7695f42ba897cd
Author: 军长 
Date:   2017-09-15T03:10:49Z

[FLINK-7627] [table] SingleElementIterable should implement with 
Serializable




> SingleElementIterable should implement with Serializable 
> -
>
> Key: FLINK-7627
> URL: https://issues.apache.org/jira/browse/FLINK-7627
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>
> {{SingleElementIterable}} is used to merge accumulators and it should be 
> serializable considering that it will be serialized when doing checkpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4674: [FLINK-7627] [table] SingleElementIterable should ...

2017-09-14 Thread hequn8128
GitHub user hequn8128 opened a pull request:

https://github.com/apache/flink/pull/4674

[FLINK-7627] [table] SingleElementIterable should implement with 
Serializable


## What is the purpose of the change

*This pull request is a bugfix which implements `SingleElementIterable` 
with `Serializable`*


## Brief change log

  - *Implement SingleElementIterable with Serializable*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added test that validates that `SingleElementIterable` can be 
serialized, otherwise exceptions will be throwed*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no )

## Documentation

  - Does this pull request introduce a new feature? (no)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hequn8128/flink 7627

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4674.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4674


commit b1af2d612265f0727048388fea7695f42ba897cd
Author: 军长 
Date:   2017-09-15T03:10:49Z

[FLINK-7627] [table] SingleElementIterable should implement with 
Serializable




---


[jira] [Created] (FLINK-7627) SingleElementIterable should implement with Serializable

2017-09-14 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-7627:
--

 Summary: SingleElementIterable should implement with Serializable 
 Key: FLINK-7627
 URL: https://issues.apache.org/jira/browse/FLINK-7627
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Hequn Cheng
Assignee: Hequn Cheng


{{SingleElementIterable}} is used to merge accumulators and it should be 
serializable considering that it will be serialized when doing checkpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-5858) Support multiple sinks in same execution DAG

2017-09-14 Thread godfrey he (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he reassigned FLINK-5858:
-

Assignee: godfrey he

> Support multiple sinks in same execution DAG
> 
>
> Key: FLINK-5858
> URL: https://issues.apache.org/jira/browse/FLINK-5858
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> When call writeToSink method to write the Table(with TableSource) to a 
> TableSink, the Table was translated to DataSet or DataStream, if we call 
> writeToSink(write to different sinks) more than once, the Table was also 
> translated more than once. The final execution graph was parted to different 
> DAGs. For example:
> {code:title=Example.scala|borderStyle=solid}
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val csvTableSource = new CsvTableSource(
>   "/tmp/words",
>   Array("first", "id", "score", "last"),
>   Array(
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.DOUBLE_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO
>   ),
>   fieldDelim = "#"
> )
> tEnv.registerTableSource("csv_source", csvTableSource)
> val resultTable = tEnv.scan("csv_source")
>   .groupBy('first)
>   .select('first, 'score.sum)
> resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
> resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))
> println(tEnv.explain(resultTable))
> {code}
> Results:
> == Abstract Syntax Tree ==
> LogicalProject(first=[$0], TMP_1=[$1])
>   LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
> LogicalProject(first=[$0], score=[$2])
>   LogicalTableScan(table=[[csv_source]])
> == Optimized Logical Plan ==
> DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
>   BatchTableSourceScan(table=[[csv_source]], fields=[first, score])
> == Physical Execution Plan ==
> {color:red}
> Stage 6 : Data Source
> {color}
>   content : collect elements with CollectionInputFormat
>   Partitioning : RANDOM_PARTITIONED
>   Stage 5 : Map
>   content : prepare select: (first, SUM(score) AS TMP_0)
>   ship_strategy : Forward
>   exchange_mode : PIPELINED
>   driver_strategy : Map
>   Partitioning : RANDOM_PARTITIONED
>   Stage 4 : GroupCombine
>   content : groupBy: (first), select: (first, SUM(score) 
> AS TMP_0)
>   ship_strategy : Forward
>   exchange_mode : PIPELINED
>   driver_strategy : Sorted Combine
>   Partitioning : RANDOM_PARTITIONED
>   Stage 3 : GroupReduce
>   content : groupBy: (first), select: (first, 
> SUM(score) AS TMP_0)
>   ship_strategy : Hash Partition on [0]
>   exchange_mode : PIPELINED
>   driver_strategy : Sorted Group Reduce
>   Partitioning : RANDOM_PARTITIONED
>   Stage 2 : Map
>   content : to: Row(f0: String, f1: 
> Double)
>   ship_strategy : Forward
>   exchange_mode : PIPELINED
>   driver_strategy : Map
>   Partitioning : RANDOM_PARTITIONED
>   Stage 1 : Map
>   content : Map at 
> emitDataSet(CsvTableSink.scala:67)
>   ship_strategy : Forward
>   exchange_mode : PIPELINED
>   driver_strategy : Map
>   Partitioning : 
> RANDOM_PARTITIONED
>   Stage 0 : Data Sink
>   content : 
> TextOutputFormat (/tmp/wordcount1) - UTF-8
>   ship_strategy : Forward
>   exchange_mode : 
> PIPELINED
>   Partitioning : 
> RANDOM_PARTITIONED
> {color:red}
> Stage 13 : Data Source
> {color}
>   content : collect elements with CollectionInputFormat
>   Partitioning : RANDOM_PARTITIONED
>   Stage 12 : Map
>   content : prepare select: (first, SUM(score) AS TMP_0)
>   ship_strategy : Forward
>   exchange_mode : 

[jira] [Comment Edited] (FLINK-7606) CEP operator leaks state

2017-09-14 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167242#comment-16167242
 ] 

Dian Fu edited comment on FLINK-7606 at 9/15/17 2:18 AM:
-

I think this is by design otherwise users could not get the timed out partially 
matched sequences as they may be cleared by the timers. For the memory leak 
issue, it seems that you are using the {{MemoryStateBackend}}, will it just be 
used for test or will also be used in your production environment? I think with 
{{RocksDBStateBackend}}, there will be no memory leak issue. 


was (Author: dian.fu):
I think this is by design otherwise users could not get the timed out partially 
matched sequences as they will be cleared by the timers. For the memory leak 
issue, it seems that you are using the {{MemoryStateBackend}}, is it just used 
for test or will also used in your product environment? I think with 
{{RocksDBStateBackend}}, there will be no memory leak issue. 

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-09-14 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167242#comment-16167242
 ] 

Dian Fu commented on FLINK-7606:


I think this is by design otherwise users could not get the timed out partially 
matched sequences as they will be cleared by the timers. For the memory leak 
issue, it seems that you are using the {{MemoryStateBackend}}, is it just used 
for test or will also used in your product environment? I think with 
{{RocksDBStateBackend}}, there will be no memory leak issue. 

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167224#comment-16167224
 ] 

ASF GitHub Bot commented on FLINK-5859:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139049572
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{ArrayList => JArrayList, List => JList}
+
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, 
PartitionPruner}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A [[TableSource]] extending this class is a partition table,
+  * and will get the relevant partitions about the query.
+  *
+  * @tparam T The return type of the [[TableSource]].
+  */
+abstract class PartitionableTableSource[T] extends 
FilterableTableSource[T] {
+
+  private var relBuilder: Option[RelBuilder] = None
+
+  /**
+* Get all partitions belong to this table
+*
+* @return All partitions belong to this table
+*/
+  def getAllPartitions: JList[Partition]
+
+  /**
+* Get partition field names.
+*
+* @return Partition field names.
+*/
+  def getPartitionFieldNames: Array[String]
+
+  /**
+* Get partition field types.
+*
+* @return Partition field types.
+*/
+  def getPartitionFieldTypes: Array[TypeInformation[_]]
+
+  /**
+* Whether drop partition predicates after apply partition pruning.
+*
+* @return true only if the result is correct without partition 
predicate
+*/
+  def supportDropPartitionPredicate: Boolean = false
+
+  /**
+* @return Pruned partitions
+*/
+  def getPrunedPartitions: JList[Partition]
+
+  /**
+* @return True if apply partition pruning
+*/
+  def isPartitionPruned: Boolean
+
+  /**
+* If a partitionable table source which can't apply non-partition 
filters should not pick any
+* predicates.
+* If a partitionable table source which can apply non-partition 
filters should check and pick
+* only predicates this table source can support.
+*
+* After trying to push pruned-partitions and predicates down, we 
should return a new
+* [[TableSource]] instance which holds all pruned-partitions and all 
pushed down predicates.
+* Even if we actually pushed nothing down, it is recommended that we 
still return a new
+* [[TableSource]] instance since we will mark the returned instance as 
filter push down has
+* been tried.
+* 
+* We also should note to not changing the form of the predicates 
passed in. It has been
+* organized in CNF conjunctive form, and we should only take or leave 
each element from the
+* list. Don't try to reorganize the predicates if you are absolutely 
confident with that.
+*
+* @param partitionPruned  Whether partition pruning is applied.
--- End diff --

We should make this flag more clear. If you mean this flag represents 
whether the partition pruning is applied, i would say it should always be true, 
because when this method been called, at least framework had tried to apply the 
partition pruning.


> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>   

[GitHub] flink pull request #4667: [FLINK-5859] [table] Add PartitionableTableSource ...

2017-09-14 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139049572
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{ArrayList => JArrayList, List => JList}
+
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, 
PartitionPruner}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A [[TableSource]] extending this class is a partition table,
+  * and will get the relevant partitions about the query.
+  *
+  * @tparam T The return type of the [[TableSource]].
+  */
+abstract class PartitionableTableSource[T] extends 
FilterableTableSource[T] {
+
+  private var relBuilder: Option[RelBuilder] = None
+
+  /**
+* Get all partitions belong to this table
+*
+* @return All partitions belong to this table
+*/
+  def getAllPartitions: JList[Partition]
+
+  /**
+* Get partition field names.
+*
+* @return Partition field names.
+*/
+  def getPartitionFieldNames: Array[String]
+
+  /**
+* Get partition field types.
+*
+* @return Partition field types.
+*/
+  def getPartitionFieldTypes: Array[TypeInformation[_]]
+
+  /**
+* Whether drop partition predicates after apply partition pruning.
+*
+* @return true only if the result is correct without partition 
predicate
+*/
+  def supportDropPartitionPredicate: Boolean = false
+
+  /**
+* @return Pruned partitions
+*/
+  def getPrunedPartitions: JList[Partition]
+
+  /**
+* @return True if apply partition pruning
+*/
+  def isPartitionPruned: Boolean
+
+  /**
+* If a partitionable table source which can't apply non-partition 
filters should not pick any
+* predicates.
+* If a partitionable table source which can apply non-partition 
filters should check and pick
+* only predicates this table source can support.
+*
+* After trying to push pruned-partitions and predicates down, we 
should return a new
+* [[TableSource]] instance which holds all pruned-partitions and all 
pushed down predicates.
+* Even if we actually pushed nothing down, it is recommended that we 
still return a new
+* [[TableSource]] instance since we will mark the returned instance as 
filter push down has
+* been tried.
+* 
+* We also should note to not changing the form of the predicates 
passed in. It has been
+* organized in CNF conjunctive form, and we should only take or leave 
each element from the
+* list. Don't try to reorganize the predicates if you are absolutely 
confident with that.
+*
+* @param partitionPruned  Whether partition pruning is applied.
--- End diff --

We should make this flag more clear. If you mean this flag represents 
whether the partition pruning is applied, i would say it should always be true, 
because when this method been called, at least framework had tried to apply the 
partition pruning.


---


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167219#comment-16167219
 ] 

ASF GitHub Bot commented on FLINK-5859:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139049316
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{ArrayList => JArrayList, List => JList}
+
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, 
PartitionPruner}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A [[TableSource]] extending this class is a partition table,
+  * and will get the relevant partitions about the query.
+  *
+  * @tparam T The return type of the [[TableSource]].
+  */
+abstract class PartitionableTableSource[T] extends 
FilterableTableSource[T] {
+
+  private var relBuilder: Option[RelBuilder] = None
+
+  /**
+* Get all partitions belong to this table
+*
+* @return All partitions belong to this table
+*/
+  def getAllPartitions: JList[Partition]
+
+  /**
+* Get partition field names.
+*
+* @return Partition field names.
+*/
+  def getPartitionFieldNames: Array[String]
+
+  /**
+* Get partition field types.
+*
+* @return Partition field types.
+*/
+  def getPartitionFieldTypes: Array[TypeInformation[_]]
+
+  /**
+* Whether drop partition predicates after apply partition pruning.
+*
+* @return true only if the result is correct without partition 
predicate
+*/
+  def supportDropPartitionPredicate: Boolean = false
+
+  /**
+* @return Pruned partitions
+*/
+  def getPrunedPartitions: JList[Partition]
+
+  /**
+* @return True if apply partition pruning
+*/
+  def isPartitionPruned: Boolean
+
+  /**
+* If a partitionable table source which can't apply non-partition 
filters should not pick any
+* predicates.
+* If a partitionable table source which can apply non-partition 
filters should check and pick
+* only predicates this table source can support.
+*
+* After trying to push pruned-partitions and predicates down, we 
should return a new
+* [[TableSource]] instance which holds all pruned-partitions and all 
pushed down predicates.
+* Even if we actually pushed nothing down, it is recommended that we 
still return a new
+* [[TableSource]] instance since we will mark the returned instance as 
filter push down has
+* been tried.
+* 
+* We also should note to not changing the form of the predicates 
passed in. It has been
+* organized in CNF conjunctive form, and we should only take or leave 
each element from the
+* list. Don't try to reorganize the predicates if you are absolutely 
confident with that.
+*
+* @param partitionPruned  Whether partition pruning is applied.
+* @param prunedPartitions Remaining partitions after partition pruning 
applied.
--- End diff --

Looks like the definition of "prunedPartitions" is contrary here. I think 
we should stick to only one definition, either "prunedPartitions" represents 
all partitions which have been pruned, or all remaining partitions which 
survive after pruning.


> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: 

[GitHub] flink pull request #4667: [FLINK-5859] [table] Add PartitionableTableSource ...

2017-09-14 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139049316
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{ArrayList => JArrayList, List => JList}
+
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, 
PartitionPruner}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A [[TableSource]] extending this class is a partition table,
+  * and will get the relevant partitions about the query.
+  *
+  * @tparam T The return type of the [[TableSource]].
+  */
+abstract class PartitionableTableSource[T] extends 
FilterableTableSource[T] {
+
+  private var relBuilder: Option[RelBuilder] = None
+
+  /**
+* Get all partitions belong to this table
+*
+* @return All partitions belong to this table
+*/
+  def getAllPartitions: JList[Partition]
+
+  /**
+* Get partition field names.
+*
+* @return Partition field names.
+*/
+  def getPartitionFieldNames: Array[String]
+
+  /**
+* Get partition field types.
+*
+* @return Partition field types.
+*/
+  def getPartitionFieldTypes: Array[TypeInformation[_]]
+
+  /**
+* Whether drop partition predicates after apply partition pruning.
+*
+* @return true only if the result is correct without partition 
predicate
+*/
+  def supportDropPartitionPredicate: Boolean = false
+
+  /**
+* @return Pruned partitions
+*/
+  def getPrunedPartitions: JList[Partition]
+
+  /**
+* @return True if apply partition pruning
+*/
+  def isPartitionPruned: Boolean
+
+  /**
+* If a partitionable table source which can't apply non-partition 
filters should not pick any
+* predicates.
+* If a partitionable table source which can apply non-partition 
filters should check and pick
+* only predicates this table source can support.
+*
+* After trying to push pruned-partitions and predicates down, we 
should return a new
+* [[TableSource]] instance which holds all pruned-partitions and all 
pushed down predicates.
+* Even if we actually pushed nothing down, it is recommended that we 
still return a new
+* [[TableSource]] instance since we will mark the returned instance as 
filter push down has
+* been tried.
+* 
+* We also should note to not changing the form of the predicates 
passed in. It has been
+* organized in CNF conjunctive form, and we should only take or leave 
each element from the
+* list. Don't try to reorganize the predicates if you are absolutely 
confident with that.
+*
+* @param partitionPruned  Whether partition pruning is applied.
+* @param prunedPartitions Remaining partitions after partition pruning 
applied.
--- End diff --

Looks like the definition of "prunedPartitions" is contrary here. I think 
we should stick to only one definition, either "prunedPartitions" represents 
all partitions which have been pruned, or all remaining partitions which 
survive after pruning.


---


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167216#comment-16167216
 ] 

ASF GitHub Bot commented on FLINK-5859:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139048604
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{ArrayList => JArrayList, List => JList}
+
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, 
PartitionPruner}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A [[TableSource]] extending this class is a partition table,
+  * and will get the relevant partitions about the query.
+  *
+  * @tparam T The return type of the [[TableSource]].
+  */
+abstract class PartitionableTableSource[T] extends 
FilterableTableSource[T] {
+
+  private var relBuilder: Option[RelBuilder] = None
+
+  /**
+* Get all partitions belong to this table
+*
+* @return All partitions belong to this table
+*/
+  def getAllPartitions: JList[Partition]
+
+  /**
+* Get partition field names.
+*
+* @return Partition field names.
+*/
+  def getPartitionFieldNames: Array[String]
+
+  /**
+* Get partition field types.
+*
+* @return Partition field types.
+*/
+  def getPartitionFieldTypes: Array[TypeInformation[_]]
+
+  /**
+* Whether drop partition predicates after apply partition pruning.
+*
+* @return true only if the result is correct without partition 
predicate
+*/
+  def supportDropPartitionPredicate: Boolean = false
+
+  /**
+* @return Pruned partitions
+*/
+  def getPrunedPartitions: JList[Partition]
+
+  /**
+* @return True if apply partition pruning
+*/
+  def isPartitionPruned: Boolean
+
+  /**
+* If a partitionable table source which can't apply non-partition 
filters should not pick any
+* predicates.
+* If a partitionable table source which can apply non-partition 
filters should check and pick
+* only predicates this table source can support.
+*
+* After trying to push pruned-partitions and predicates down, we 
should return a new
+* [[TableSource]] instance which holds all pruned-partitions and all 
pushed down predicates.
+* Even if we actually pushed nothing down, it is recommended that we 
still return a new
+* [[TableSource]] instance since we will mark the returned instance as 
filter push down has
+* been tried.
+* 
+* We also should note to not changing the form of the predicates 
passed in. It has been
+* organized in CNF conjunctive form, and we should only take or leave 
each element from the
+* list. Don't try to reorganize the predicates if you are absolutely 
confident with that.
+*
+* @param partitionPruned  Whether partition pruning is applied.
+* @param prunedPartitions Remaining partitions after partition pruning 
applied.
+* Notes: If partition pruning is not applied, 
prunedPartitions is empty.
+* @param predicates   A list contains conjunctive predicates, you 
should pick and remove all
+* expressions that can be pushed down. The 
remaining elements of this
+* list will further evaluated by framework.
+* @return A new cloned instance of [[TableSource]].
+  

[GitHub] flink pull request #4667: [FLINK-5859] [table] Add PartitionableTableSource ...

2017-09-14 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139048604
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{ArrayList => JArrayList, List => JList}
+
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, 
PartitionPruner}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A [[TableSource]] extending this class is a partition table,
+  * and will get the relevant partitions about the query.
+  *
+  * @tparam T The return type of the [[TableSource]].
+  */
+abstract class PartitionableTableSource[T] extends 
FilterableTableSource[T] {
+
+  private var relBuilder: Option[RelBuilder] = None
+
+  /**
+* Get all partitions belong to this table
+*
+* @return All partitions belong to this table
+*/
+  def getAllPartitions: JList[Partition]
+
+  /**
+* Get partition field names.
+*
+* @return Partition field names.
+*/
+  def getPartitionFieldNames: Array[String]
+
+  /**
+* Get partition field types.
+*
+* @return Partition field types.
+*/
+  def getPartitionFieldTypes: Array[TypeInformation[_]]
+
+  /**
+* Whether drop partition predicates after apply partition pruning.
+*
+* @return true only if the result is correct without partition 
predicate
+*/
+  def supportDropPartitionPredicate: Boolean = false
+
+  /**
+* @return Pruned partitions
+*/
+  def getPrunedPartitions: JList[Partition]
+
+  /**
+* @return True if apply partition pruning
+*/
+  def isPartitionPruned: Boolean
+
+  /**
+* If a partitionable table source which can't apply non-partition 
filters should not pick any
+* predicates.
+* If a partitionable table source which can apply non-partition 
filters should check and pick
+* only predicates this table source can support.
+*
+* After trying to push pruned-partitions and predicates down, we 
should return a new
+* [[TableSource]] instance which holds all pruned-partitions and all 
pushed down predicates.
+* Even if we actually pushed nothing down, it is recommended that we 
still return a new
+* [[TableSource]] instance since we will mark the returned instance as 
filter push down has
+* been tried.
+* 
+* We also should note to not changing the form of the predicates 
passed in. It has been
+* organized in CNF conjunctive form, and we should only take or leave 
each element from the
+* list. Don't try to reorganize the predicates if you are absolutely 
confident with that.
+*
+* @param partitionPruned  Whether partition pruning is applied.
+* @param prunedPartitions Remaining partitions after partition pruning 
applied.
+* Notes: If partition pruning is not applied, 
prunedPartitions is empty.
+* @param predicates   A list contains conjunctive predicates, you 
should pick and remove all
+* expressions that can be pushed down. The 
remaining elements of this
+* list will further evaluated by framework.
+* @return A new cloned instance of [[TableSource]].
+*/
+  def applyPrunedPartitionsAndPredicate(
+partitionPruned: Boolean,
+prunedPartitions: JList[Partition],
+predicates: JList[Expression]): TableSource[T]
+
+
+  /**
+* Check and pick 

[jira] [Commented] (FLINK-4047) Fix documentation about determinism of KeySelectors

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167214#comment-16167214
 ] 

ASF GitHub Bot commented on FLINK-4047:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/4659#discussion_r139048412
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java 
---
@@ -36,7 +37,7 @@
 public interface KeySelector extends Function, Serializable {
 
/**
-* User-defined function that extracts the key from an arbitrary object.
+* User-defined function that extracts the key from an deterministic 
object.
--- End diff --

Okay. Thanks. I will correct it soon.


> Fix documentation about determinism of KeySelectors
> ---
>
> Key: FLINK-4047
> URL: https://issues.apache.org/jira/browse/FLINK-4047
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.1.0, 1.0.3
>Reporter: Fabian Hueske
>Priority: Blocker
>  Labels: starter
> Fix For: 1.4.0
>
>
> KeySelectors must return deterministic keys, i.e., if invoked multiple times 
> on the same object, the returned key must be the same.
> The documentation about this aspect is broken ("The key can be of any type 
> and be derived from arbitrary computations.").
> We need to fix the JavaDocs of the {{KeySelector}} interface and the web 
> documentation 
> (https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#specifying-keys).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4659: [FLINK-4047] Fix documentation about determinism o...

2017-09-14 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/4659#discussion_r139048412
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java 
---
@@ -36,7 +37,7 @@
 public interface KeySelector extends Function, Serializable {
 
/**
-* User-defined function that extracts the key from an arbitrary object.
+* User-defined function that extracts the key from an deterministic 
object.
--- End diff --

Okay. Thanks. I will correct it soon.


---


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167213#comment-16167213
 ] 

ASF GitHub Bot commented on FLINK-5859:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139048337
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
 ---
@@ -55,4 +55,9 @@ trait FilterableTableSource[T] {
 */
   def isFilterPushedDown: Boolean
 
+  /**
+* @param relBuilder Builder for relational expressions.
+*/
+  def setRelBuilder(relBuilder: RelBuilder): Unit
--- End diff --

Can you move this method to PartitionableTableSource?


> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4667: [FLINK-5859] [table] Add PartitionableTableSource ...

2017-09-14 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139048337
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
 ---
@@ -55,4 +55,9 @@ trait FilterableTableSource[T] {
 */
   def isFilterPushedDown: Boolean
 
+  /**
+* @param relBuilder Builder for relational expressions.
+*/
+  def setRelBuilder(relBuilder: RelBuilder): Unit
--- End diff --

Can you move this method to PartitionableTableSource?


---


[jira] [Closed] (FLINK-7617) Remove string format in BitSet to improve the performance of BuildSideOuterjoin

2017-09-14 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-7617.
-
   Resolution: Fixed
Fix Version/s: 1.3.3
   1.4.0

> Remove string format in BitSet to improve the performance of 
> BuildSideOuterjoin
> ---
>
> Key: FLINK-7617
> URL: https://issues.apache.org/jira/browse/FLINK-7617
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
> Fix For: 1.4.0, 1.3.3
>
>
> When using BuildSideOuterjoin, will frequently call Bitset.set and get, there 
> will be
> Preconditions.checkArgument (index  = 0,
> String.format ("Input Index [% d] is larger than BitSet available size [% 
> d].", Index, bitLength));
> Of the check, String.format will lead to a sharp decline performance.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7617) Remove string format in BitSet to improve the performance of BuildSideOuterjoin

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167179#comment-16167179
 ] 

ASF GitHub Bot commented on FLINK-7617:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/4668
  
@fhueske sure


> Remove string format in BitSet to improve the performance of 
> BuildSideOuterjoin
> ---
>
> Key: FLINK-7617
> URL: https://issues.apache.org/jira/browse/FLINK-7617
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>
> When using BuildSideOuterjoin, will frequently call Bitset.set and get, there 
> will be
> Preconditions.checkArgument (index  = 0,
> String.format ("Input Index [% d] is larger than BitSet available size [% 
> d].", Index, bitLength));
> Of the check, String.format will lead to a sharp decline performance.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4668: [FLINK-7617] Remove string format in BitSet to improve th...

2017-09-14 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/4668
  
@fhueske sure


---


[jira] [Commented] (FLINK-3991) Remove deprecated configuration keys from ConfigConstants

2017-09-14 Thread Chetna Chaudhari (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167060#comment-16167060
 ] 

Chetna Chaudhari commented on FLINK-3991:
-

Is it still open? Can I work on it?

> Remove deprecated configuration keys from ConfigConstants
> -
>
> Key: FLINK-3991
> URL: https://issues.apache.org/jira/browse/FLINK-3991
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Reporter: Robert Metzger
> Fix For: 2.0.0
>
>
> In 
> https://github.com/apache/flink/commit/b0acd97935cd21843bac3b9b5afa3662b52bb95d#diff-40616c4678c3fbfe07c0701505ce0567
>  I deprecated some configuration keys.
> They are unused and need to be removed with the 2.0 release.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4673: [hotfix] [cep] Fix afterMatchStrategy parameter missing i...

2017-09-14 Thread yestinchen
Github user yestinchen commented on the issue:

https://github.com/apache/flink/pull/4673
  
Hi @dawidwys ,
I just found this issue in after match feature, which causes 
AfterMatchSkipStrategy not work. This may be introduced while merging file 
conflicts. I addressed that in this PR and added a simple test to exam such 
problems. Please merge this to the repository. Thanks.


---


[jira] [Commented] (FLINK-7622) Respect local KPL queue size in FlinkKinesisProducer when adding records to KPL client

2017-09-14 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166925#comment-16166925
 ] 

Bowen Li commented on FLINK-7622:
-

[~tzulitai] I think FLINK-7508 indirectly solves this issue, since FLINK-7508 
greatly increased the throughput of Kinesis producer. Here's an example:

https://imgur.com/a/u198h  Here are the metrics of our prod Flink job - 
UserRecords Put (aka UserRecords put into KPL queue), and user_records_pending 
(aka outstanding UserRecords waiting to be sent to AWS in the queue).

When using per_request threading model, a small number (~0.5million) of 
UserRecords can cause huge number (~15k) of records pending because the 
throughput is so low. After switching to pooled threading model, you can see 
the number of outstanding UserRecords has dropped significantly (~0) even 
though the number of UserRecords put into the queue grow to 16X bigger 
(~8million at peak). Take a closer look at our user_records_pending metric for 
the past two weeks at https://imgur.com/a/2YxIm, the # of outstanding records 
is consistently under 150 (impressive, right?). 

Thus, I believe propagating back pressure to upstream for FlinkKinesisProducer 
is not necessary anymore.

> Respect local KPL queue size in FlinkKinesisProducer when adding records to 
> KPL client
> --
>
> Key: FLINK-7622
> URL: https://issues.apache.org/jira/browse/FLINK-7622
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>
> This issue was brought to discussion by [~sthm] offline.
> Currently, records are added to the Kinesis KPL producer client without 
> checking the number of outstanding records within the local KPL queue. This 
> manner is basically neglecting backpressure when producing to Kinesis through 
> KPL, and can therefore exhaust system resources.
> We should respect {{producer.getOutstandingRecordsCount()}} as a measure of 
> backpressure, and propagate backpressure upstream by blocking further sink 
> invocations when some threshold of outstanding record count is exceeded. The 
> recommended threshold [1] seems to be 10,000.
> [1] 
> https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166783#comment-16166783
 ] 

ASF GitHub Bot commented on FLINK-7508:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4656
  
@tzulitai Thanks, Gordon! I watched your presentation on "managing state of 
Flink", and you did a great job explaining all details.

I added an option for PER_REQUEST model since it doesn't hurt anything. 

This PR is for 1.4. There's [another PR 
here](https://github.com/apache/flink/pull/4657) I think need to be merged to 
both 1.4 and 1.3.x


> switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode 
> rather than Per_Request mode
> 
>
> Key: FLINK-7508
> URL: https://issues.apache.org/jira/browse/FLINK-7508
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.4.0
>
>
> KinesisProducerLibrary (KPL) 0.10.x had been using a 
> One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which 
> is very expensive.
> 0.12.4 introduced a new [ThreadingMode - 
> Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225],
>  which will use a thread pool. This hugely improves KPL's performance and 
> reduces consumed resources. By default, KPL still uses per-request mode. We 
> should explicitly switch FlinkKinesisProducer's KPL threading mode to 
> 'Pooled'.
> This work depends on FLINK-7366 and FLINK-7508
> Benchmarking I did:
> * Environment: Running a Flink hourly-sliding windowing job on 18-node EMR 
> cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink 
> job generates about 21million UserRecords, which means that we generated a 
> test load of 21million UserRecords at the first minute of each hour.
> * Criteria: Test KPL throughput per minute. Since the default RecordTTL for 
> KPL is 30 sec, we can be sure that either all UserRecords are sent by KPL 
> within a minute, or we will see UserRecord expiration errors.
> * One-New-Thread-Per-Request model: max throughput is about 2million 
> UserRecords per min; it doesn't go beyond that because CPU utilization goes 
> to 100%, everything stopped working and that Flink job crashed.
> * Thread-Pool model with pool size of 10: it sends out 21million UserRecords 
> within 30 sec without any UserRecord expiration errors. The average peak CPU 
> utilization is about 20% - 30%. So 21million UserRecords/min is not the max 
> throughput of thread-pool model. We didn't go any further because 1) this 
> throughput is already a couple times more than what we really need, and 2) we 
> don't have a quick way of increasing the test load
> Thus, I propose switching FlinkKinesisProducer to Thread-Pool mode. 
> [~tzulitai] What do you think



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4656: [FLINK-7508][kinesis] switch FlinkKinesisProducer to use ...

2017-09-14 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4656
  
@tzulitai Thanks, Gordon! I watched your presentation on "managing state of 
Flink", and you did a great job explaining all details.

I added an option for PER_REQUEST model since it doesn't hurt anything. 

This PR is for 1.4. There's [another PR 
here](https://github.com/apache/flink/pull/4657) I think need to be merged to 
both 1.4 and 1.3.x


---


[jira] [Commented] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166709#comment-16166709
 ] 

ASF GitHub Bot commented on FLINK-7508:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4656#discussion_r138965647
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -171,8 +171,9 @@ public void open(Configuration parameters) throws 
Exception {
super.open(parameters);
 
// check and pass the configuration properties
-   KinesisProducerConfiguration producerConfig = 
KinesisConfigUtil.validateProducerConfiguration(configProps);
+   KinesisProducerConfiguration producerConfig = 
KinesisConfigUtil.getValidatedProducerConfiguration(configProps);

producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
+   
producerConfig.setThreadingModel(KinesisProducerConfiguration.ThreadingModel.POOLED);
--- End diff --

On a second thought for this. Though I believe POOLED model is the best 
option for most of the use cases I can think of, we should give users the 
flexibility to make decisions.

Adding PER_REQUEST model 


> switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode 
> rather than Per_Request mode
> 
>
> Key: FLINK-7508
> URL: https://issues.apache.org/jira/browse/FLINK-7508
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.4.0
>
>
> KinesisProducerLibrary (KPL) 0.10.x had been using a 
> One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which 
> is very expensive.
> 0.12.4 introduced a new [ThreadingMode - 
> Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225],
>  which will use a thread pool. This hugely improves KPL's performance and 
> reduces consumed resources. By default, KPL still uses per-request mode. We 
> should explicitly switch FlinkKinesisProducer's KPL threading mode to 
> 'Pooled'.
> This work depends on FLINK-7366 and FLINK-7508
> Benchmarking I did:
> * Environment: Running a Flink hourly-sliding windowing job on 18-node EMR 
> cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink 
> job generates about 21million UserRecords, which means that we generated a 
> test load of 21million UserRecords at the first minute of each hour.
> * Criteria: Test KPL throughput per minute. Since the default RecordTTL for 
> KPL is 30 sec, we can be sure that either all UserRecords are sent by KPL 
> within a minute, or we will see UserRecord expiration errors.
> * One-New-Thread-Per-Request model: max throughput is about 2million 
> UserRecords per min; it doesn't go beyond that because CPU utilization goes 
> to 100%, everything stopped working and that Flink job crashed.
> * Thread-Pool model with pool size of 10: it sends out 21million UserRecords 
> within 30 sec without any UserRecord expiration errors. The average peak CPU 
> utilization is about 20% - 30%. So 21million UserRecords/min is not the max 
> throughput of thread-pool model. We didn't go any further because 1) this 
> throughput is already a couple times more than what we really need, and 2) we 
> don't have a quick way of increasing the test load
> Thus, I propose switching FlinkKinesisProducer to Thread-Pool mode. 
> [~tzulitai] What do you think



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4656: [FLINK-7508][kinesis] switch FlinkKinesisProducer ...

2017-09-14 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4656#discussion_r138965647
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -171,8 +171,9 @@ public void open(Configuration parameters) throws 
Exception {
super.open(parameters);
 
// check and pass the configuration properties
-   KinesisProducerConfiguration producerConfig = 
KinesisConfigUtil.validateProducerConfiguration(configProps);
+   KinesisProducerConfiguration producerConfig = 
KinesisConfigUtil.getValidatedProducerConfiguration(configProps);

producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
+   
producerConfig.setThreadingModel(KinesisProducerConfiguration.ThreadingModel.POOLED);
--- End diff --

On a second thought for this. Though I believe POOLED model is the best 
option for most of the use cases I can think of, we should give users the 
flexibility to make decisions.

Adding PER_REQUEST model 


---


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166633#comment-16166633
 ] 

ASF GitHub Bot commented on FLINK-7567:
---

Github user mlipkovich commented on the issue:

https://github.com/apache/flink/pull/4655
  
The local build works fine. If you mean `mvn verify` by compatibility 
plugin it also worked with no issues


> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Mikhail Lipkovich
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4655: [FLINK-7567]: Removed keepPartitioning parameter from ite...

2017-09-14 Thread mlipkovich
Github user mlipkovich commented on the issue:

https://github.com/apache/flink/pull/4655
  
The local build works fine. If you mean `mvn verify` by compatibility 
plugin it also worked with no issues


---


[jira] [Commented] (FLINK-4814) Remove extra storage location for externalized checkpoint metadata

2017-09-14 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166576#comment-16166576
 ] 

Elias Levy commented on FLINK-4814:
---

Yes please.  That would be helpful.  I would suggest multiple levels of 
overriding.  E.g. the cluster config can provide defaults, as it does now for 
some things. You should be able to package a config file in your job's 
resources that override the cluster's defaults.  You should also be able to 
pass the job a config file at run time to override values in both of those.  
And command line arguments and programmatically set values should override all 
everything else.

It would be good for the job to print its configuration on start up to aid 
debugging of settings, like the Kafka producer/consumer do.

Also, as I mentioned in the mailing list, the mini cluster used in the local 
execution environment doesn't load a config file, so there are things, like 
external checkpoints, that can't be tested at the moment from within an IDE.

API-wise it could also be more consistent.  At the moment you configure some 
settings via the stream environment (e.g. 
{{env.setStreamTimeCharacteristic()}}), others via {{env.getConfig.foo}}, more 
via {{env.getCheckpointConfig.bar}}, and some only via the config file.

> Remove extra storage location for externalized checkpoint metadata
> --
>
> Key: FLINK-4814
> URL: https://issues.apache.org/jira/browse/FLINK-4814
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>
> Follow up for FLINK-4512.
> Store checkpoint meta data in checkpoint directory.  That makes it simpler 
> for users to track and clean up checkpoints manually, if they want to retain 
> externalized checkpoints across cancellations and terminal failures.
> Every state backend needs to be able to provide a storage location for the 
> checkpoint metadata. The memory state backend would hence not work with 
> externalized checkpoints, unless one sets explicitly a parameter 
> `setExternalizedCheckpointsLocation(uri)`.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4658: [Flink-7596][Table API & SQL] fix bug when Set Operation ...

2017-09-14 Thread Xpray
Github user Xpray commented on the issue:

https://github.com/apache/flink/pull/4658
  
@twalthr , I've rewrite the methods, please have a look.


---


[jira] [Commented] (FLINK-7589) org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 159764230; received: 64638536)

2017-09-14 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166527#comment-16166527
 ] 

Steve Loughran commented on FLINK-7589:
---

bq.  is there any plan to make Flink use AWS S3 SDK directly rather than 
Hadoop's S3 implementation?

the topic has surface. Note that if you do want a client which is resilient to 
transitive failures, that's a fair amount of extra work, and you have to 
implement the fault injecting wrapper on the AWS SDK to actually verify that it 
works on an functional test suite...that's something to consider shipping so 
that people can use it in their own integration tests.

> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536)
> ---
>
> Key: FLINK-7589
> URL: https://issues.apache.org/jira/browse/FLINK-7589
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> When I tried to resume a Flink job from a savepoint with different 
> parallelism, I ran into this error. And the resume failed.
> {code:java}
> 2017-09-05 21:53:57,317 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- returnsivs -> 
> Sink: xxx (7/12) (0da16ec908fc7b9b16a5c2cf1aa92947) switched from RUNNING to 
> FAILED.
> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536
>   at 
> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:180)
>   at 
> org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at 
> com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:77)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:160)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:72)
>   at 
> org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:61)
>   at 
> org.apache.flink.runtime.util.NonClosingStreamDecorator.read(NonClosingStreamDecorator.java:47)
>   at java.io.DataInputStream.readFully(DataInputStream.java:195)
>   at java.io.DataInputStream.readLong(DataInputStream.java:416)
>   at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:68)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimer$TimerSerializer.deserialize(InternalTimer.java:156)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.restoreTimersForKeyGroup(HeapInternalTimerService.java:345)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:141)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:496)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...

2017-09-14 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/4665
  
@aljoscha i agree that the name set to be "numLateElementsDropped", and do 
you mean that my result should minus the num of element that go to side output 
which is skipped and lateElement?


---


[GitHub] flink pull request #4673: [hotfix] [cep] Fix afterMatchStrategy parameter mi...

2017-09-14 Thread yestinchen
GitHub user yestinchen opened a pull request:

https://github.com/apache/flink/pull/4673

[hotfix] [cep] Fix afterMatchStrategy parameter missing issue



## What is the purpose of the change
Fix afterMatchSkipStrategy parameter missing when calling `nfa.process()` 
function. This issue may be introduced during merging conflicts.

## Brief change log


## Verifying this change

This change added tests and can be verified as follows:
append a aftermatch test case in `CEPITCase` to exam this kind of problems.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yestinchen/flink aftermatch-hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4673.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4673


commit 060dcde4bd9662c98ade061c1698fcff5bb702b3
Author: Yestin <873915...@qq.com>
Date:   2017-09-14T16:01:53Z

[hotfix] [cep] Fix afterMatchStrategy parameter missing issue

This issue may be introduced during merging conflicts.




---


[jira] [Commented] (FLINK-7589) org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 159764230; received: 64638536)

2017-09-14 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166515#comment-16166515
 ] 

Bowen Li commented on FLINK-7589:
-

[~aljoscha] Yes, the job restarted successfully.

Just curious - is there any plan to make Flink use AWS S3 SDK directly rather 
than Hadoop's S3 implementation?

> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536)
> ---
>
> Key: FLINK-7589
> URL: https://issues.apache.org/jira/browse/FLINK-7589
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> When I tried to resume a Flink job from a savepoint with different 
> parallelism, I ran into this error. And the resume failed.
> {code:java}
> 2017-09-05 21:53:57,317 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- returnsivs -> 
> Sink: xxx (7/12) (0da16ec908fc7b9b16a5c2cf1aa92947) switched from RUNNING to 
> FAILED.
> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536
>   at 
> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:180)
>   at 
> org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at 
> com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:77)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:160)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:72)
>   at 
> org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:61)
>   at 
> org.apache.flink.runtime.util.NonClosingStreamDecorator.read(NonClosingStreamDecorator.java:47)
>   at java.io.DataInputStream.readFully(DataInputStream.java:195)
>   at java.io.DataInputStream.readLong(DataInputStream.java:416)
>   at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:68)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimer$TimerSerializer.deserialize(InternalTimer.java:156)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.restoreTimersForKeyGroup(HeapInternalTimerService.java:345)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:141)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:496)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4658: [Flink-7596][Table API & SQL] fix bug when Set Operation ...

2017-09-14 Thread Xpray
Github user Xpray commented on the issue:

https://github.com/apache/flink/pull/4658
  
what about overriding `leastRestrictiveSqlType` method in 
`FlinkTypeFactory`?  


---


[jira] [Commented] (FLINK-4796) Add new Sink interface with access to more meta data

2017-09-14 Thread Erik van Oosten (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166443#comment-16166443
 ] 

Erik van Oosten commented on FLINK-4796:


I am not sure why this is marked as a duplicate. The problem here is 
inconsistent handling of the runtime context inside the different layers under 
FlinkKafkaProducer: method {{getRuntimeContext}} gives {{null}} even though 
{{setRuntimeContext}} was called.

How does that relate to the addition of a new interface?

> Add new Sink interface with access to more meta data
> 
>
> Key: FLINK-4796
> URL: https://issues.apache.org/jira/browse/FLINK-4796
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Aljoscha Krettek
>
> The current {{SinkFunction}} cannot access the timestamps of elements which 
> resulted in the (somewhat hacky) {{FlinkKafkaProducer010}}. Due to other 
> limitations {{GenericWriteAheadSink}} is currently also a {{StreamOperator}} 
> and not a {{SinkFunction}}.
> We should add a new interface for sinks that takes a context parameter, 
> similar to {{ProcessFunction}}. This will allow sinks to query additional 
> meta data about the element that they're receiving. 
> This is one ML thread where a user ran into a problem caused by this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-td12633.html#a12635
> h3. Original Text (that is still valid but not general)
> The Kafka 0.10 connector supports writing event timestamps to Kafka.
> Currently, the regular DataStream APIs don't allow user code to access the 
> event timestamp easily. That's why the Kafka connector is using a custom 
> operator ({{transform()}}) to access the event time.
> With this JIRA, I would like to provide the event timestamp in the regular 
> DataStream APIs.
> Once I'll look into the issue, I'll post some proposals how to add the 
> timestamp. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4658: [Flink-7596][Table API & SQL] fix bug when Set Operation ...

2017-09-14 Thread Xpray
Github user Xpray commented on the issue:

https://github.com/apache/flink/pull/4658
  
Thanks for your advice. I am considering a better way to solve this.


---


[jira] [Commented] (FLINK-4814) Remove extra storage location for externalized checkpoint metadata

2017-09-14 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166428#comment-16166428
 ] 

Aljoscha Krettek commented on FLINK-4814:
-

Yes I think this is still a problem and I think it's part of a larger problem 
that I'm trying to address in this one-pager: 
https://docs.google.com/document/d/1Q8hkpl2VVXrbEaJkQNe88kuus0F5wFFiebub9hqrg1s/edit?usp=sharing

I'm just laying out what I would like to have, no solutions yet, though. What 
do you think?

> Remove extra storage location for externalized checkpoint metadata
> --
>
> Key: FLINK-4814
> URL: https://issues.apache.org/jira/browse/FLINK-4814
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>
> Follow up for FLINK-4512.
> Store checkpoint meta data in checkpoint directory.  That makes it simpler 
> for users to track and clean up checkpoints manually, if they want to retain 
> externalized checkpoints across cancellations and terminal failures.
> Every state backend needs to be able to provide a storage location for the 
> checkpoint metadata. The memory state backend would hence not work with 
> externalized checkpoints, unless one sets explicitly a parameter 
> `setExternalizedCheckpointsLocation(uri)`.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4047) Fix documentation about determinism of KeySelectors

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166401#comment-16166401
 ] 

ASF GitHub Bot commented on FLINK-4047:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4659#discussion_r138918167
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java 
---
@@ -36,7 +37,7 @@
 public interface KeySelector extends Function, Serializable {
 
/**
-* User-defined function that extracts the key from an arbitrary object.
+* User-defined function that extracts the key from an deterministic 
object.
--- End diff --

I think this should be `User-defined function that deterministically 
extracts the key from an object.`


> Fix documentation about determinism of KeySelectors
> ---
>
> Key: FLINK-4047
> URL: https://issues.apache.org/jira/browse/FLINK-4047
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.1.0, 1.0.3
>Reporter: Fabian Hueske
>Priority: Blocker
>  Labels: starter
> Fix For: 1.4.0
>
>
> KeySelectors must return deterministic keys, i.e., if invoked multiple times 
> on the same object, the returned key must be the same.
> The documentation about this aspect is broken ("The key can be of any type 
> and be derived from arbitrary computations.").
> We need to fix the JavaDocs of the {{KeySelector}} interface and the web 
> documentation 
> (https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#specifying-keys).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4659: [FLINK-4047] Fix documentation about determinism o...

2017-09-14 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4659#discussion_r138918167
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java 
---
@@ -36,7 +37,7 @@
 public interface KeySelector extends Function, Serializable {
 
/**
-* User-defined function that extracts the key from an arbitrary object.
+* User-defined function that extracts the key from an deterministic 
object.
--- End diff --

I think this should be `User-defined function that deterministically 
extracts the key from an object.`


---


[jira] [Closed] (FLINK-4796) Add new Sink interface with access to more meta data

2017-09-14 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-4796.
---
Resolution: Duplicate

> Add new Sink interface with access to more meta data
> 
>
> Key: FLINK-4796
> URL: https://issues.apache.org/jira/browse/FLINK-4796
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Aljoscha Krettek
>
> The current {{SinkFunction}} cannot access the timestamps of elements which 
> resulted in the (somewhat hacky) {{FlinkKafkaProducer010}}. Due to other 
> limitations {{GenericWriteAheadSink}} is currently also a {{StreamOperator}} 
> and not a {{SinkFunction}}.
> We should add a new interface for sinks that takes a context parameter, 
> similar to {{ProcessFunction}}. This will allow sinks to query additional 
> meta data about the element that they're receiving. 
> This is one ML thread where a user ran into a problem caused by this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-td12633.html#a12635
> h3. Original Text (that is still valid but not general)
> The Kafka 0.10 connector supports writing event timestamps to Kafka.
> Currently, the regular DataStream APIs don't allow user code to access the 
> event timestamp easily. That's why the Kafka connector is using a custom 
> operator ({{transform()}}) to access the event time.
> With this JIRA, I would like to provide the event timestamp in the regular 
> DataStream APIs.
> Once I'll look into the issue, I'll post some proposals how to add the 
> timestamp. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4796) Add new Sink interface with access to more meta data

2017-09-14 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166394#comment-16166394
 ] 

Aljoscha Krettek commented on FLINK-4796:
-

Ah I see, the workaround is for the NPE. 

> Add new Sink interface with access to more meta data
> 
>
> Key: FLINK-4796
> URL: https://issues.apache.org/jira/browse/FLINK-4796
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Aljoscha Krettek
>
> The current {{SinkFunction}} cannot access the timestamps of elements which 
> resulted in the (somewhat hacky) {{FlinkKafkaProducer010}}. Due to other 
> limitations {{GenericWriteAheadSink}} is currently also a {{StreamOperator}} 
> and not a {{SinkFunction}}.
> We should add a new interface for sinks that takes a context parameter, 
> similar to {{ProcessFunction}}. This will allow sinks to query additional 
> meta data about the element that they're receiving. 
> This is one ML thread where a user ran into a problem caused by this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-td12633.html#a12635
> h3. Original Text (that is still valid but not general)
> The Kafka 0.10 connector supports writing event timestamps to Kafka.
> Currently, the regular DataStream APIs don't allow user code to access the 
> event timestamp easily. That's why the Kafka connector is using a custom 
> operator ({{transform()}}) to access the event time.
> With this JIRA, I would like to provide the event timestamp in the regular 
> DataStream APIs.
> Once I'll look into the issue, I'll post some proposals how to add the 
> timestamp. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4796) Add new Sink interface with access to more meta data

2017-09-14 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166391#comment-16166391
 ] 

Aljoscha Krettek commented on FLINK-4796:
-

Sorry, I didn't get what this workaround is for. Could you please go into a bit 
more detail?

> Add new Sink interface with access to more meta data
> 
>
> Key: FLINK-4796
> URL: https://issues.apache.org/jira/browse/FLINK-4796
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Aljoscha Krettek
>
> The current {{SinkFunction}} cannot access the timestamps of elements which 
> resulted in the (somewhat hacky) {{FlinkKafkaProducer010}}. Due to other 
> limitations {{GenericWriteAheadSink}} is currently also a {{StreamOperator}} 
> and not a {{SinkFunction}}.
> We should add a new interface for sinks that takes a context parameter, 
> similar to {{ProcessFunction}}. This will allow sinks to query additional 
> meta data about the element that they're receiving. 
> This is one ML thread where a user ran into a problem caused by this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-td12633.html#a12635
> h3. Original Text (that is still valid but not general)
> The Kafka 0.10 connector supports writing event timestamps to Kafka.
> Currently, the regular DataStream APIs don't allow user code to access the 
> event timestamp easily. That's why the Kafka connector is using a custom 
> operator ({{transform()}}) to access the event time.
> With this JIRA, I would like to provide the event timestamp in the regular 
> DataStream APIs.
> Once I'll look into the issue, I'll post some proposals how to add the 
> timestamp. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-4796) Add new Sink interface with access to more meta data

2017-09-14 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-4796:
---

Assignee: Aljoscha Krettek

> Add new Sink interface with access to more meta data
> 
>
> Key: FLINK-4796
> URL: https://issues.apache.org/jira/browse/FLINK-4796
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Aljoscha Krettek
>
> The current {{SinkFunction}} cannot access the timestamps of elements which 
> resulted in the (somewhat hacky) {{FlinkKafkaProducer010}}. Due to other 
> limitations {{GenericWriteAheadSink}} is currently also a {{StreamOperator}} 
> and not a {{SinkFunction}}.
> We should add a new interface for sinks that takes a context parameter, 
> similar to {{ProcessFunction}}. This will allow sinks to query additional 
> meta data about the element that they're receiving. 
> This is one ML thread where a user ran into a problem caused by this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-td12633.html#a12635
> h3. Original Text (that is still valid but not general)
> The Kafka 0.10 connector supports writing event timestamps to Kafka.
> Currently, the regular DataStream APIs don't allow user code to access the 
> event timestamp easily. That's why the Kafka connector is using a custom 
> operator ({{transform()}}) to access the event time.
> With this JIRA, I would like to provide the event timestamp in the regular 
> DataStream APIs.
> Once I'll look into the issue, I'll post some proposals how to add the 
> timestamp. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2646) Rich functions should provide a method "closeAfterFailure()"

2017-09-14 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166387#comment-16166387
 ] 

Aljoscha Krettek commented on FLINK-2646:
-

I thought about this and the new {{closeAfterFailure()}} should not actually 
call {{close()}} by default because this would force implementations that only 
want "close" behaviour, i.e. "close successfully", to also implement an empty 
{{closeAfterFailure()}} to prevent the default {{close()}} invocation. This can 
easily be forgotten and would lead to wrong results.

> Rich functions should provide a method "closeAfterFailure()"
> 
>
> Key: FLINK-2646
> URL: https://issues.apache.org/jira/browse/FLINK-2646
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
> Fix For: 1.0.0
>
>
> Right now, the {{close()}} method of rich functions is invoked in case of 
> proper completion, and in case of canceling in case of error (to allow for 
> cleanup).
> In certain cases, the user function needs to know why it is closed, whether 
> the task completed in a regular fashion, or was canceled/failed.
> I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By 
> default, this method calls {{close()}}. The runtime is the changed to call 
> {{close()}} as part of the regular execution and {{closeAfterFailure()}} in 
> case of an irregular exit.
> Because by default all cases call {{close()}} the change would not be API 
> breaking.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7589) org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 159764230; received: 64638536)

2017-09-14 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166370#comment-16166370
 ] 

Aljoscha Krettek commented on FLINK-7589:
-

Yes, I think we cannot do much except wait here.

[~phoenixjiangnan] This was a transient failure and a subsequent restore 
succeeded, right?

> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536)
> ---
>
> Key: FLINK-7589
> URL: https://issues.apache.org/jira/browse/FLINK-7589
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> When I tried to resume a Flink job from a savepoint with different 
> parallelism, I ran into this error. And the resume failed.
> {code:java}
> 2017-09-05 21:53:57,317 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- returnsivs -> 
> Sink: xxx (7/12) (0da16ec908fc7b9b16a5c2cf1aa92947) switched from RUNNING to 
> FAILED.
> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536
>   at 
> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:180)
>   at 
> org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at 
> com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:77)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:160)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:72)
>   at 
> org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:61)
>   at 
> org.apache.flink.runtime.util.NonClosingStreamDecorator.read(NonClosingStreamDecorator.java:47)
>   at java.io.DataInputStream.readFully(DataInputStream.java:195)
>   at java.io.DataInputStream.readLong(DataInputStream.java:416)
>   at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:68)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimer$TimerSerializer.deserialize(InternalTimer.java:156)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.restoreTimersForKeyGroup(HeapInternalTimerService.java:345)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:141)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:496)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7617) Remove string format in BitSet to improve the performance of BuildSideOuterjoin

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166369#comment-16166369
 ] 

ASF GitHub Bot commented on FLINK-7617:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4668
  
Hi @KurtYoung, do you want to merge this to the `release-1.3` branch as 
well?


> Remove string format in BitSet to improve the performance of 
> BuildSideOuterjoin
> ---
>
> Key: FLINK-7617
> URL: https://issues.apache.org/jira/browse/FLINK-7617
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>
> When using BuildSideOuterjoin, will frequently call Bitset.set and get, there 
> will be
> Preconditions.checkArgument (index  = 0,
> String.format ("Input Index [% d] is larger than BitSet available size [% 
> d].", Index, bitLength));
> Of the check, String.format will lead to a sharp decline performance.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4668: [FLINK-7617] Remove string format in BitSet to improve th...

2017-09-14 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4668
  
Hi @KurtYoung, do you want to merge this to the `release-1.3` branch as 
well?


---


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166365#comment-16166365
 ] 

ASF GitHub Bot commented on FLINK-7567:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4655
  
You can try and run the build locally to have a look at the file generated 
by the compatibility plugin. Then we can figure out why it's complaining.


> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Mikhail Lipkovich
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4655: [FLINK-7567]: Removed keepPartitioning parameter from ite...

2017-09-14 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4655
  
You can try and run the build locally to have a look at the file generated 
by the compatibility plugin. Then we can figure out why it's complaining.


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-14 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r138905728
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
 ---
@@ -218,7 +221,7 @@ public long addRecord(T record) throws IOException {
return offset;
}

-   public MutableObjectIterator 
finishWriteAndSortKeys(List memory) throws IOException {
+   public MutableObjectIterator 
finishWriteAndSortKeys(List memory) throws IOException, 
IllegalAccessException, TemplateException, InstantiationException, 
CompileException, InvocationTargetException, NoSuchMethodException, 
ClassNotFoundException {
--- End diff --

Or, maybe even better to catch all of these in the sorter factory method 
(`SorterFactory.createSorter`), and create a non-codegen sorter when any of 
these exceptions happen (and log a warning).


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-14 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r138904204
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 ---
@@ -115,6 +115,13 @@ public Configuration getConfiguration() {
return tmpDirectories;
}
 
+   public String getFirstTmpDirectory(){
--- End diff --

We can remove this method, since it became unused when we changed to 
generating the code into just a string instead of a temporary file.


---


[jira] [Assigned] (FLINK-7617) Remove string format in BitSet to improve the performance of BuildSideOuterjoin

2017-09-14 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-7617:
---

Assignee: Jingsong Lee

> Remove string format in BitSet to improve the performance of 
> BuildSideOuterjoin
> ---
>
> Key: FLINK-7617
> URL: https://issues.apache.org/jira/browse/FLINK-7617
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>
> When using BuildSideOuterjoin, will frequently call Bitset.set and get, there 
> will be
> Preconditions.checkArgument (index  = 0,
> String.format ("Input Index [% d] is larger than BitSet available size [% 
> d].", Index, bitLength));
> Of the check, String.format will lead to a sharp decline performance.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7607) Web Frontend Hangs with Large Numbers of Tasks

2017-09-14 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7607:

Fix Version/s: 1.4.0

> Web Frontend Hangs with Large Numbers of Tasks
> --
>
> Key: FLINK-7607
> URL: https://issues.apache.org/jira/browse/FLINK-7607
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.3.2
> Environment: Attempted to load the web frontend on a MacBook Pro 15" 
> (late 2016) with 16 GB of memory using both Chrome 60.0 and Safari 10.1.2.
>Reporter: Joshua Griffith
>  Labels: performance
> Fix For: 1.4.0
>
>
> Viewing a job with a high number of tasks in the web front-end causes the 
> page to hang, consuming 100% CPU on a core. At 200 tasks the page slows 
> noticeably and scrolling results in long, non-responsive pauses. At 400 tasks 
> the page only updates once per minute and is almost entirely non-responsive.
> Initially, I thought this was caused by rendering a complex job graph but 
> opening the inspector and deleting the canvas did not improve page 
> performance. Further inspection indicated that the page was redrawing every 
> DOM element in the task list on every update.
> A possible solution is to use an approach similar to 
> [react-list|https://github.com/orgsync/react-list] and only request 
> data/render list items that are in view and only update DOM nodes that have 
> changed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-09-14 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166340#comment-16166340
 ] 

Aljoscha Krettek commented on FLINK-7606:
-

Will NFAs not be cleared when the allowed "within" time is over. If this were 
not the case and if NFA state is in fact never cleaned up than this is a leak 
and will lead to problems.

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7606) CEP operator leaks state

2017-09-14 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7606:

Summary: CEP operator leaks state  (was: Memory leak on 
NestedMapsStateTable)

> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-14 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r138899420
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
 ---
@@ -218,7 +221,7 @@ public long addRecord(T record) throws IOException {
return offset;
}

-   public MutableObjectIterator 
finishWriteAndSortKeys(List memory) throws IOException {
+   public MutableObjectIterator 
finishWriteAndSortKeys(List memory) throws IOException, 
IllegalAccessException, TemplateException, InstantiationException, 
CompileException, InvocationTargetException, NoSuchMethodException, 
ClassNotFoundException {
--- End diff --

I'm thinking that we should encapsulate all these exceptions into one 
`SorterCodegenException` (or just `CodegenException`) and throw only that from 
the sorter factory method, to avoid having to declare this litany of exceptions 
(this same list occurs at multiple places).


---


[jira] [Updated] (FLINK-7590) Flink failed to flush and close the file system output stream for checkpointing because of s3 read timeout

2017-09-14 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7590:

Priority: Major  (was: Blocker)

> Flink failed to flush and close the file system output stream for 
> checkpointing because of s3 read timeout
> --
>
> Key: FLINK-7590
> URL: https://issues.apache.org/jira/browse/FLINK-7590
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> Flink job failed once over the weekend because of the following issue. It 
> picked itself up afterwards and has been running well. But the issue might 
> worth taking a look at.
> {code:java}
> 2017-09-03 13:18:38,998 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- reduce 
> (14/18) (c97256badc87e995d456e7a13cec5de9) switched from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 163 for operator reduce (14/18).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 163 for 
> operator reduce (14/18).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa in order to obtain the 
> stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.io.IOException: Could not flush and close the file system output stream 
> to s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa in order to obtain 
> the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>   ... 7 more
>   Caused by: java.io.IOException: Could not flush and close the file 
> system output stream to s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa 
> in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeSnapshotStreamAndGetHandle(RocksDBKeyedStateBackend.java:693)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeCheckpointStream(RocksDBKeyedStateBackend.java:531)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:420)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:399)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> 

[jira] [Commented] (FLINK-7590) Flink failed to flush and close the file system output stream for checkpointing because of s3 read timeout

2017-09-14 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166333#comment-16166333
 ] 

Aljoscha Krettek commented on FLINK-7590:
-

Agreed!

> Flink failed to flush and close the file system output stream for 
> checkpointing because of s3 read timeout
> --
>
> Key: FLINK-7590
> URL: https://issues.apache.org/jira/browse/FLINK-7590
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Flink job failed once over the weekend because of the following issue. It 
> picked itself up afterwards and has been running well. But the issue might 
> worth taking a look at.
> {code:java}
> 2017-09-03 13:18:38,998 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- reduce 
> (14/18) (c97256badc87e995d456e7a13cec5de9) switched from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 163 for operator reduce (14/18).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 163 for 
> operator reduce (14/18).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa in order to obtain the 
> stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.io.IOException: Could not flush and close the file system output stream 
> to s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa in order to obtain 
> the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>   ... 7 more
>   Caused by: java.io.IOException: Could not flush and close the file 
> system output stream to s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa 
> in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeSnapshotStreamAndGetHandle(RocksDBKeyedStateBackend.java:693)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeCheckpointStream(RocksDBKeyedStateBackend.java:531)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:420)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:399)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>   at 

[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166323#comment-16166323
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138879343
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
@@ -159,23 +159,20 @@ public void recycle(MemorySegment segment) {
 
this.numTotalRequiredBuffers += numRequiredBuffers;
 
-   final List segments = new 
ArrayList<>(numRequiredBuffers);
-   for (int i = 0 ; i < numRequiredBuffers ; i++) {
-   segments.add(availableMemorySegments.poll());
-   }
+   redistributeBuffers();
+   }
 
+   final List segments = new 
ArrayList<>(numRequiredBuffers);
+   for (int i = 0 ; i < numRequiredBuffers ; i++) {
try {
-   redistributeBuffers();
-   } catch (IOException e) {
-   if (segments.size() > 0) {
-   recycleMemorySegments(segments);
-   }
-
+   segments.add(availableMemorySegments.take());
--- End diff --

I know, I was the one who suggested it, but thinking about the blocking 
`take()` a bit more and with some more background I acquired over the last 
weeks, I'm getting the feeling, we should do the request similar to 
`LocalBufferPool#requestBuffer()` so that if (for some reason) we are waiting 
forever, we may at least be stopped by the `destroy()` function being called. 
Or what do you think? I'm thinking about something like this:
```
final ArrayList segments = new 
ArrayList<>(numRequiredBuffers);
try {
while (segments.size() < numRequiredBuffers) {
if (isDestroyed) {
throw new IllegalStateException("Buffer 
pool is destroyed.");
}

final MemorySegment segment = 
availableMemorySegments.poll(2, TimeUnit.SECONDS);
if (segment != null) {
segments.add(segment);
}
}
} catch (Throwable e) {
recycleMemorySegments(segments);
ExceptionUtils.rethrowIOException(e);
}
```
(using the same timeout of 2s as in `LocalBufferPool#requestBuffer()`)

The following test (for `NetworkBufferPoolTest`) could verify this 
behaviour:
```
@Rule
public ExpectedException expectedException = ExpectedException.none();

/**
 * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, 
verifying it may be aborted in
 * case of a concurrent {@link NetworkBufferPool#destroy()} call.
 */
@Test
public void testRequestMemorySegmentsInterruptable() throws Exception {
final int numBuffers = 10;

NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
MemorySegment segment = globalPool.requestMemorySegment();
assertNotNull(segment);

final OneShotLatch isRunning = new OneShotLatch();
CheckedThread asyncRequest = new CheckedThread() {
@Override
public void go() throws Exception {
isRunning.trigger();
globalPool.requestMemorySegments(10);
}
};
asyncRequest.start();

// We want the destroy call inside the blocking part of the 
globalPool.requestMemorySegments()
// call above. We cannot guarantee this though but make it 
highly probable:
isRunning.await();
Thread.sleep(10);
globalPool.destroy();

segment.free();

expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("destroyed");
asyncRequest.sync();
}
```


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
> 

[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166322#comment-16166322
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138852758
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws 
IOException {
globalPool.destroy();
}
 
+   /**
+* Tests the interaction of requesting memory segments and creating 
local buffer pool and
+* verifies the number of assigned buffers match after redistributing 
buffers because of newly
+* requested memory segments or new buffer pools created.
+*/
+   @Test
+   public void testUniformDistributionBounded4() throws IOException {
+   NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 
MemoryType.HEAP);
+
+   BufferPool first = globalPool.createBufferPool(0, 10);
+   assertEquals(10, first.getNumBuffers());
+
+   List segmentList1 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList1.size());
+   assertEquals(8, first.getNumBuffers());
+
+   BufferPool second = globalPool.createBufferPool(0, 10);
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   List segmentList2 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList2.size());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   List segmentList3 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList3.size());
+   assertEquals(2, first.getNumBuffers());
+   assertEquals(2, second.getNumBuffers());
+
+   String msg = "Did not return all buffers to network buffer pool 
after test.";
+   assertEquals(msg, 4, 
globalPool.getNumberOfAvailableMemorySegments());
--- End diff --

for this test, `msg` is wrong as nothing has been recycled here (yet)


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166324#comment-16166324
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138891582
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
invalid argument to
+* cause exception.
+*/
+   @Test
+   public void testRequestMemorySegmentsWithInvalidArgument() throws 
Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List memorySegments = Collections.emptyList();
+   try {
+   // the number of requested buffers should be larger 
than zero
+   memorySegments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertEquals(memorySegments.size(), 0);
+   
assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
+   }
+   }
+
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently not containing the number of required free 

[jira] [Updated] (FLINK-7608) LatencyGauge change to histogram metric

2017-09-14 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7608:

Priority: Blocker  (was: Major)

> LatencyGauge change to  histogram metric
> 
>
> Key: FLINK-7608
> URL: https://issues.apache.org/jira/browse/FLINK-7608
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]  to 
> export metrics the log file.
> I found:
> {noformat}
> -- Gauges 
> -
> ..
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Map.0.latency:
>  value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, 
> p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}}
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Sink- Unnamed.0.latency: 
> value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, 
> p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}}
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166327#comment-16166327
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138889161
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
--- End diff --

add `finally`with `globalPool.destroy()`


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166329#comment-16166329
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138853431
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws 
IOException {
globalPool.destroy();
}
 
+   /**
+* Tests the interaction of requesting memory segments and creating 
local buffer pool and
+* verifies the number of assigned buffers match after redistributing 
buffers because of newly
+* requested memory segments or new buffer pools created.
+*/
+   @Test
+   public void testUniformDistributionBounded4() throws IOException {
+   NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 
MemoryType.HEAP);
+
+   BufferPool first = globalPool.createBufferPool(0, 10);
+   assertEquals(10, first.getNumBuffers());
+
+   List segmentList1 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList1.size());
+   assertEquals(8, first.getNumBuffers());
+
+   BufferPool second = globalPool.createBufferPool(0, 10);
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   List segmentList2 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList2.size());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   List segmentList3 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList3.size());
+   assertEquals(2, first.getNumBuffers());
+   assertEquals(2, second.getNumBuffers());
+
+   String msg = "Did not return all buffers to network buffer pool 
after test.";
+   assertEquals(msg, 4, 
globalPool.getNumberOfAvailableMemorySegments());
+
+   globalPool.recycleMemorySegments(segmentList1);
+   assertEquals(msg, 6, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   globalPool.recycleMemorySegments(segmentList2);
+   assertEquals(msg, 8, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   globalPool.recycleMemorySegments(segmentList3);
+   assertEquals(msg, 10, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(5, first.getNumBuffers());
+   assertEquals(5, second.getNumBuffers());
+
+   globalPool.destroy();
--- End diff --

you also need to call `NetworkBufferPool#destroyAllBufferPools()` or 
`LocalBufferPool#lazyDestroy()` for `first` and `second` to properly release 
their buffers


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166328#comment-16166328
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r13514
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
invalid argument to
+* cause exception.
+*/
+   @Test
+   public void testRequestMemorySegmentsWithInvalidArgument() throws 
Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List memorySegments = Collections.emptyList();
+   try {
+   // the number of requested buffers should be larger 
than zero
+   memorySegments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertEquals(memorySegments.size(), 0);
--- End diff --

unnecessary check - see above


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: 

[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166325#comment-16166325
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r13882
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
--- End diff --

here, you should also destroy the `globalPool`, i.e. call 
`globalPool.destroy()`


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166331#comment-16166331
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138897087
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -333,7 +333,7 @@ public void 
updateInputChannel(InputChannelDeploymentDescriptor icdd) throws IOE
 
InputChannel current = inputChannels.get(partitionId);
 
-   if (current.getClass() == UnknownInputChannel.class) {
+   if (current instanceof UnknownInputChannel) {
--- End diff --

Just to be on the safe side, you should also change this check in 
`#setInputChannel()` above. This way, we handle all sub-classes of 
`UnknownInputChannel` the same way as `UnknownInputChannel` itself


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166330#comment-16166330
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138889183
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
invalid argument to
+* cause exception.
+*/
+   @Test
+   public void testRequestMemorySegmentsWithInvalidArgument() throws 
Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List memorySegments = Collections.emptyList();
+   try {
+   // the number of requested buffers should be larger 
than zero
+   memorySegments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertEquals(memorySegments.size(), 0);
+   
assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
+   }
--- End diff --

add `finally`with `globalPool.destroy()`


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> 

[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166326#comment-16166326
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r13440
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);
--- End diff --

If there was an exception, `memorySegments` will _always_ be the 
`Collections.emptyList()` you set before, so there's no need to check for its 
size.


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}}s directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138891582
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
invalid argument to
+* cause exception.
+*/
+   @Test
+   public void testRequestMemorySegmentsWithInvalidArgument() throws 
Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List memorySegments = Collections.emptyList();
+   try {
+   // the number of requested buffers should be larger 
than zero
+   memorySegments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertEquals(memorySegments.size(), 0);
+   
assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
+   }
+   }
+
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently not containing the number of required free segments 
(currently occupied by a buffer pool).
+*/
+   @Test
+   public void testRequestMemorySegmentsWithBuffersTaken() throws 
IOException, InterruptedException {
+   final int numBuffers = 10;
+
+

[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138889161
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
--- End diff --

add `finally`with `globalPool.destroy()`


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r13882
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
--- End diff --

here, you should also destroy the `globalPool`, i.e. call 
`globalPool.destroy()`


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138879343
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
@@ -159,23 +159,20 @@ public void recycle(MemorySegment segment) {
 
this.numTotalRequiredBuffers += numRequiredBuffers;
 
-   final List segments = new 
ArrayList<>(numRequiredBuffers);
-   for (int i = 0 ; i < numRequiredBuffers ; i++) {
-   segments.add(availableMemorySegments.poll());
-   }
+   redistributeBuffers();
+   }
 
+   final List segments = new 
ArrayList<>(numRequiredBuffers);
+   for (int i = 0 ; i < numRequiredBuffers ; i++) {
try {
-   redistributeBuffers();
-   } catch (IOException e) {
-   if (segments.size() > 0) {
-   recycleMemorySegments(segments);
-   }
-
+   segments.add(availableMemorySegments.take());
--- End diff --

I know, I was the one who suggested it, but thinking about the blocking 
`take()` a bit more and with some more background I acquired over the last 
weeks, I'm getting the feeling, we should do the request similar to 
`LocalBufferPool#requestBuffer()` so that if (for some reason) we are waiting 
forever, we may at least be stopped by the `destroy()` function being called. 
Or what do you think? I'm thinking about something like this:
```
final ArrayList segments = new 
ArrayList<>(numRequiredBuffers);
try {
while (segments.size() < numRequiredBuffers) {
if (isDestroyed) {
throw new IllegalStateException("Buffer 
pool is destroyed.");
}

final MemorySegment segment = 
availableMemorySegments.poll(2, TimeUnit.SECONDS);
if (segment != null) {
segments.add(segment);
}
}
} catch (Throwable e) {
recycleMemorySegments(segments);
ExceptionUtils.rethrowIOException(e);
}
```
(using the same timeout of 2s as in `LocalBufferPool#requestBuffer()`)

The following test (for `NetworkBufferPoolTest`) could verify this 
behaviour:
```
@Rule
public ExpectedException expectedException = ExpectedException.none();

/**
 * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, 
verifying it may be aborted in
 * case of a concurrent {@link NetworkBufferPool#destroy()} call.
 */
@Test
public void testRequestMemorySegmentsInterruptable() throws Exception {
final int numBuffers = 10;

NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
MemorySegment segment = globalPool.requestMemorySegment();
assertNotNull(segment);

final OneShotLatch isRunning = new OneShotLatch();
CheckedThread asyncRequest = new CheckedThread() {
@Override
public void go() throws Exception {
isRunning.trigger();
globalPool.requestMemorySegments(10);
}
};
asyncRequest.start();

// We want the destroy call inside the blocking part of the 
globalPool.requestMemorySegments()
// call above. We cannot guarantee this though but make it 
highly probable:
isRunning.await();
Thread.sleep(10);
globalPool.destroy();

segment.free();

expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("destroyed");
asyncRequest.sync();
}
```


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r13514
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
invalid argument to
+* cause exception.
+*/
+   @Test
+   public void testRequestMemorySegmentsWithInvalidArgument() throws 
Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List memorySegments = Collections.emptyList();
+   try {
+   // the number of requested buffers should be larger 
than zero
+   memorySegments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertEquals(memorySegments.size(), 0);
--- End diff --

unnecessary check - see above


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138897087
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
@@ -333,7 +333,7 @@ public void 
updateInputChannel(InputChannelDeploymentDescriptor icdd) throws IOE
 
InputChannel current = inputChannels.get(partitionId);
 
-   if (current.getClass() == UnknownInputChannel.class) {
+   if (current instanceof UnknownInputChannel) {
--- End diff --

Just to be on the safe side, you should also change this check in 
`#setInputChannel()` above. This way, we handle all sub-classes of 
`UnknownInputChannel` the same way as `UnknownInputChannel` itself


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138889183
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
invalid argument to
+* cause exception.
+*/
+   @Test
+   public void testRequestMemorySegmentsWithInvalidArgument() throws 
Exception {
+   final int numBuffers = 10;
+
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+
+   List memorySegments = Collections.emptyList();
+   try {
+   // the number of requested buffers should be larger 
than zero
+   memorySegments = globalPool.requestMemorySegments(0);
+   fail("Should throw an IllegalArgumentException");
+   } catch (IllegalArgumentException e) {
+   assertEquals(memorySegments.size(), 0);
+   
assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
+   }
--- End diff --

add `finally`with `globalPool.destroy()`


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138852758
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws 
IOException {
globalPool.destroy();
}
 
+   /**
+* Tests the interaction of requesting memory segments and creating 
local buffer pool and
+* verifies the number of assigned buffers match after redistributing 
buffers because of newly
+* requested memory segments or new buffer pools created.
+*/
+   @Test
+   public void testUniformDistributionBounded4() throws IOException {
+   NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 
MemoryType.HEAP);
+
+   BufferPool first = globalPool.createBufferPool(0, 10);
+   assertEquals(10, first.getNumBuffers());
+
+   List segmentList1 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList1.size());
+   assertEquals(8, first.getNumBuffers());
+
+   BufferPool second = globalPool.createBufferPool(0, 10);
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   List segmentList2 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList2.size());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   List segmentList3 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList3.size());
+   assertEquals(2, first.getNumBuffers());
+   assertEquals(2, second.getNumBuffers());
+
+   String msg = "Did not return all buffers to network buffer pool 
after test.";
+   assertEquals(msg, 4, 
globalPool.getNumberOfAvailableMemorySegments());
--- End diff --

for this test, `msg` is wrong as nothing has been recycled here (yet)


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r13440
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 ---
@@ -172,44 +178,117 @@ public void testDestroyAll() {
}
}
 
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
{@link NetworkBufferPool}
+* currently containing the number of required free segments.
+*/
@Test
-   public void testRequestAndRecycleMemorySegments() throws Exception {
+   public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
final int numBuffers = 10;
 
NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   List segments = null;
-   // request buffers from global pool with illegal argument
+   List memorySegments = Collections.emptyList();
try {
-   segments = globalPool.requestMemorySegments(0);
-   fail("Should throw an IllegalArgumentException");
-   } catch (IllegalArgumentException e) {
-   assertNull(segments);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers / 2);
+
+   assertEquals(memorySegments.size(), numBuffers / 2);
+   } finally {
+   globalPool.recycleMemorySegments(memorySegments);

assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers);
}
+   }
 
-   // common case to request buffers less than the total capacity 
of global pool
-   final int numRequiredBuffers = 8;
-   segments = globalPool.requestMemorySegments(numRequiredBuffers);
-
-   assertNotNull(segments);
-   assertEquals(segments.size(), numRequiredBuffers);
-
-   // recycle all the requested buffers to global pool
-   globalPool.recycleMemorySegments(segments);
+   /**
+* Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the 
number of required
+* buffers exceeding the capacity of {@link NetworkBufferPool}.
+*/
+   @Test
+   public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
+   final int numBuffers = 10;
 
-   assertEquals(globalPool.getNumberOfAvailableMemorySegments(), 
numBuffers);
+   NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
 
-   // uncommon case to request buffers exceeding the total 
capacity of global pool
+   List memorySegments = Collections.emptyList();
try {
-   segments = null;
-   segments = globalPool.requestMemorySegments(11);
+   memorySegments = 
globalPool.requestMemorySegments(numBuffers + 1);
fail("Should throw an IOException");
} catch (IOException e) {
-   assertNull(segments);
-   // recycle all the requested buffers to global pool 
after exception
+   assertEquals(memorySegments.size(), 0);
--- End diff --

If there was an exception, `memorySegments` will _always_ be the 
`Collections.emptyList()` you set before, so there's no need to check for its 
size.


---


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r138853431
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -155,6 +156,57 @@ public void testUniformDistributionBounded3() throws 
IOException {
globalPool.destroy();
}
 
+   /**
+* Tests the interaction of requesting memory segments and creating 
local buffer pool and
+* verifies the number of assigned buffers match after redistributing 
buffers because of newly
+* requested memory segments or new buffer pools created.
+*/
+   @Test
+   public void testUniformDistributionBounded4() throws IOException {
+   NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 
MemoryType.HEAP);
+
+   BufferPool first = globalPool.createBufferPool(0, 10);
+   assertEquals(10, first.getNumBuffers());
+
+   List segmentList1 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList1.size());
+   assertEquals(8, first.getNumBuffers());
+
+   BufferPool second = globalPool.createBufferPool(0, 10);
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   List segmentList2 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList2.size());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   List segmentList3 = 
globalPool.requestMemorySegments(2);
+   assertEquals(2, segmentList3.size());
+   assertEquals(2, first.getNumBuffers());
+   assertEquals(2, second.getNumBuffers());
+
+   String msg = "Did not return all buffers to network buffer pool 
after test.";
+   assertEquals(msg, 4, 
globalPool.getNumberOfAvailableMemorySegments());
+
+   globalPool.recycleMemorySegments(segmentList1);
+   assertEquals(msg, 6, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(3, first.getNumBuffers());
+   assertEquals(3, second.getNumBuffers());
+
+   globalPool.recycleMemorySegments(segmentList2);
+   assertEquals(msg, 8, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(4, first.getNumBuffers());
+   assertEquals(4, second.getNumBuffers());
+
+   globalPool.recycleMemorySegments(segmentList3);
+   assertEquals(msg, 10, 
globalPool.getNumberOfAvailableMemorySegments());
+   assertEquals(5, first.getNumBuffers());
+   assertEquals(5, second.getNumBuffers());
+
+   globalPool.destroy();
--- End diff --

you also need to call `NetworkBufferPool#destroyAllBufferPools()` or 
`LocalBufferPool#lazyDestroy()` for `first` and `second` to properly release 
their buffers


---


[jira] [Created] (FLINK-7626) Add some metric description about checkpoints

2017-09-14 Thread Hai Zhou (JIRA)
Hai Zhou created FLINK-7626:
---

 Summary: Add some metric description about checkpoints
 Key: FLINK-7626
 URL: https://issues.apache.org/jira/browse/FLINK-7626
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Metrics
Affects Versions: 1.3.2
Reporter: Hai Zhou
Assignee: Hai Zhou
 Fix For: 1.4.0, 1.3.3


Add some metric description in"Debugging & Monitoring / Metrics"  part of  docs:

{noformat}

//Number of total checkpoints (in progress, completed, failed)
totalNumberOfCheckpoints

 //Number of in progress checkpoints.
numberOfInProgressCheckpoints

//Number of successfully completed checkpoints
numberOfCompletedCheckpoints

//Number of failed checkpoints.
numberOfFailedCheckpoints

//Timestamp when the checkpoint was restored at the coordinator.
lastCheckpointRestoreTimestamp

//Buffered bytes during alignment over all subtasks.
lastCheckpointAlignmentBuffered
{noformat}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4658: [Flink-7596][Table API & SQL] fix bug when Set Ope...

2017-09-14 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4658#discussion_r138894127
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/type/SqlTypeFactoryImpl.java
 ---
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql.type;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataTypeFamily;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.SqlCollation;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.util.Glossary;
+import org.apache.calcite.util.Util;
+import org.apache.flink.table.plan.schema.GenericRelDataType;
+
+import java.nio.charset.Charset;
+import java.util.List;
+
+/**
+ * SqlTypeFactoryImpl provides a default implementation of
+ * {@link RelDataTypeFactory} which supports SQL types.
+ */
+public class SqlTypeFactoryImpl extends RelDataTypeFactoryImpl {
+   //~ Constructors 
---
+
+   public SqlTypeFactoryImpl(RelDataTypeSystem typeSystem) {
+   super(typeSystem);
+   }
+
+   //~ Methods 

+
+   // implement RelDataTypeFactory
+   public RelDataType createSqlType(SqlTypeName typeName) {
+   if (typeName.allowsPrec()) {
+   return createSqlType(typeName, 
typeSystem.getDefaultPrecision(typeName));
+   }
+   assertBasic(typeName);
+   RelDataType newType = new BasicSqlType(typeSystem, typeName);
+   return canonize(newType);
+   }
+
+   // implement RelDataTypeFactory
+   public RelDataType createSqlType(
+   SqlTypeName typeName,
+   int precision) {
+   final int maxPrecision = typeSystem.getMaxPrecision(typeName);
+   if (maxPrecision >= 0 && precision > maxPrecision) {
+   precision = maxPrecision;
+   }
+   if (typeName.allowsScale()) {
+   return createSqlType(typeName, precision, 
typeName.getDefaultScale());
+   }
+   assertBasic(typeName);
+   assert (precision >= 0)
+   || (precision == RelDataType.PRECISION_NOT_SPECIFIED);
+   RelDataType newType = new BasicSqlType(typeSystem, typeName, 
precision);
+   newType = SqlTypeUtil.addCharsetAndCollation(newType, this);
+   return canonize(newType);
+   }
+
+   // implement RelDataTypeFactory
+   public RelDataType createSqlType(
+   SqlTypeName typeName,
+   int precision,
+   int scale) {
+   assertBasic(typeName);
+   assert (precision >= 0)
+   || (precision == RelDataType.PRECISION_NOT_SPECIFIED);
+   final int maxPrecision = typeSystem.getMaxPrecision(typeName);
+   if (maxPrecision >= 0 && precision > maxPrecision) {
+   precision = maxPrecision;
+   }
+   RelDataType newType =
+   new BasicSqlType(typeSystem, typeName, precision, 
scale);
+   newType = SqlTypeUtil.addCharsetAndCollation(newType, this);
+   return canonize(newType);
+   }
+
+   // implement RelDataTypeFactory
+   public RelDataType createMultisetType(
+   RelDataType type,
+   long maxCardinality) {
+   assert maxCardinality == -1;
+   RelDataType newType = new MultisetSqlType(type, false);
+   return canonize(newType);
+   }
+
+   public RelDataType createArrayType(
+   RelDataType elementType,
+   long maxCardinality) {
+   assert 

[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...

2017-09-14 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4665
  
These are not actually the places where you can count dropped data. I would 
suggest to add this at the very end of `processElement()` where we also check 
whether we should side-output late data: 
https://github.com/apache/flink/blob/6642768ad8f8c5d1856742a6d148f7724c20666c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L406.

Also, I think a better metrics name would include "dropped" somehow, 
because right now it just says late. Something like `numLateElementsDropped`.


---


[jira] [Closed] (FLINK-6331) StreamingOperatorsITCase.testAsyncWaitOperator spuriously failing on Travis

2017-09-14 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-6331.

Resolution: Cannot Reproduce

Could not reproduce the problem. Will close the issue therefore. If this test 
failure reappears we should re-open this issue.

> StreamingOperatorsITCase.testAsyncWaitOperator spuriously failing on Travis
> ---
>
> Key: FLINK-6331
> URL: https://issues.apache.org/jira/browse/FLINK-6331
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: test-stability
>
> The {{StreamingOperatorsITCase.testAsyncWaitOperator}} fails sometimes on 
> Travis.
> [1] https://s3.amazonaws.com/archive.travis-ci.org/jobs/223541348/log.txt



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5205) Asynchronous remove job messages can interfere with recovered or resubmitted jobs

2017-09-14 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-5205.

Resolution: Won't Fix

Seems not to be a problem and with the Flip-6 changes obsolete.

> Asynchronous remove job messages can interfere with recovered or resubmitted 
> jobs
> -
>
> Key: FLINK-5205
> URL: https://issues.apache.org/jira/browse/FLINK-5205
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> The {{JobManager}} executes the {{removeJob}} call at various places 
> asynchronously via the {{RemoveJob}} message, even though this is not 
> strictly needed. This is not necessary and can be done synchronously. The 
> advantage would be that the remove job operation won't interfere with 
> recovered or resubmitted jobs which have the same {{JobID}} as the job for 
> which the message was initially triggered.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-2733) ZooKeeperLeaderElectionTest.testZooKeeperReelection fails

2017-09-14 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-2733.

Resolution: Not A Problem

> ZooKeeperLeaderElectionTest.testZooKeeperReelection fails
> -
>
> Key: FLINK-2733
> URL: https://issues.apache.org/jira/browse/FLINK-2733
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 0.10.0
>Reporter: Robert Metzger
>Assignee: Till Rohrmann
>  Labels: test-stability
>
> I observed a test failure in this run: 
> https://travis-ci.org/rmetzger/flink/jobs/81571914
> {code}
> testZooKeeperReelection(org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest)
>   Time elapsed: 109.794 sec  <<< FAILURE!
> java.lang.AssertionError: expected: but 
> was:
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:743)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testZooKeeperReelection(ZooKeeperLeaderElectionTest.java:171)
> Results :
> Failed tests: 
>   ZooKeeperLeaderElectionTest.testZooKeeperReelection:171 
> expected: but was:
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2733) ZooKeeperLeaderElectionTest.testZooKeeperReelection fails

2017-09-14 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166301#comment-16166301
 ] 

Till Rohrmann commented on FLINK-2733:
--

I think [~tonycox], you actually did not use the fixed code when observing the 
test failure. Will close the issue therefore.

> ZooKeeperLeaderElectionTest.testZooKeeperReelection fails
> -
>
> Key: FLINK-2733
> URL: https://issues.apache.org/jira/browse/FLINK-2733
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 0.10.0
>Reporter: Robert Metzger
>Assignee: Till Rohrmann
>  Labels: test-stability
>
> I observed a test failure in this run: 
> https://travis-ci.org/rmetzger/flink/jobs/81571914
> {code}
> testZooKeeperReelection(org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest)
>   Time elapsed: 109.794 sec  <<< FAILURE!
> java.lang.AssertionError: expected: but 
> was:
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:743)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testZooKeeperReelection(ZooKeeperLeaderElectionTest.java:171)
> Results :
> Failed tests: 
>   ZooKeeperLeaderElectionTest.testZooKeeperReelection:171 
> expected: but was:
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4672: [Flink-7621][TableAPI & SQL] Fix Inconsistency of CaseSen...

2017-09-14 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/4672
  
Thanks @Xpray. The changes look good, but I think if we would really want 
to have the correct behavior, we need to make more changes at different 
positions. Could you add some more tests to check insensitive behavior for 
tables, functions, fields in different programs?


---


[jira] [Assigned] (FLINK-2253) ALS fails if the data flow is split up and the input is non-deterministic

2017-09-14 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-2253:


Assignee: (was: Till Rohrmann)

> ALS fails if the data flow is split up and the input is non-deterministic
> -
>
> Key: FLINK-2253
> URL: https://issues.apache.org/jira/browse/FLINK-2253
> Project: Flink
>  Issue Type: Bug
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>  Labels: ML
>
> If the {{temporaryPath}} of Flink's {{ALS}} implementation is set, then the 
> ALS job is executed one part after the other. This also includes the routing 
> information calculation for the user and item blocks. If the input 
> {{DataSet}} is calculated non-deterministically, e.g. if it's the result of a 
> {{first}} operation, then the resulting routing information won't be 
> consistent and ALS fails.
> A solution would be to pin the input {{DataSet}} so that it will only be 
> executed once. Until we have this functionality, I propose to simply execute 
> the user and item routing information at the same time.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7618) Add BINARY supported in FlinkTypeFactory

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166294#comment-16166294
 ] 

ASF GitHub Bot commented on FLINK-7618:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/4669
  
Thanks @sunjincheng121 but we cannot merge this without tests. Adding a new 
type should have some tests esp. because you have to add this type at different 
positions. E.g. `FlinkRelNode` and in the code generator as well.


> Add BINARY supported in FlinkTypeFactory
> 
>
> Key: FLINK-7618
> URL: https://issues.apache.org/jira/browse/FLINK-7618
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> We will get the following exception when we deal with the BINARY type.
> {code}
> org.apache.flink.table.api.TableException: Type is not supported: BINARY
>   at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
>   at 
> org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:377)
>   at 
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:741)
>   at 
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:104)
>   at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
>   at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4669: [FLINK-7618][table] Add BINARY supported in FlinkTypeFact...

2017-09-14 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/4669
  
Thanks @sunjincheng121 but we cannot merge this without tests. Adding a new 
type should have some tests esp. because you have to add this type at different 
positions. E.g. `FlinkRelNode` and in the code generator as well.


---


[jira] [Commented] (FLINK-7617) Remove string format in BitSet to improve the performance of BuildSideOuterjoin

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166291#comment-16166291
 ] 

ASF GitHub Bot commented on FLINK-7617:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4668


> Remove string format in BitSet to improve the performance of 
> BuildSideOuterjoin
> ---
>
> Key: FLINK-7617
> URL: https://issues.apache.org/jira/browse/FLINK-7617
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Jingsong Lee
>
> When using BuildSideOuterjoin, will frequently call Bitset.set and get, there 
> will be
> Preconditions.checkArgument (index  = 0,
> String.format ("Input Index [% d] is larger than BitSet available size [% 
> d].", Index, bitLength));
> Of the check, String.format will lead to a sharp decline performance.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4668: [FLINK-7617] Remove string format in BitSet to imp...

2017-09-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4668


---


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166289#comment-16166289
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r138889015
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ---
@@ -211,6 +218,14 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
 canonize(relType)
   }
 
+  override def createMultisetType(elementType: RelDataType, 
maxCardinality: Long): RelDataType = {
+val relType = new MultisetRelDataType(
--- End diff --

There are multiple location where a new type has to be added like 
`FlinkRelNode`.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166288#comment-16166288
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r13586
  
--- Diff: flink-core/pom.xml ---
@@ -80,6 +80,13 @@ under the License.


 
+   
+   
+   org.apache.commons
--- End diff --

We should not add additional dependencies to Flink just because of a new 
data type. There is also no reason behind choosing this library. Couldn't we 
not just use a usual Java Map? Otherwise I would propose to add class for our 
own type like we did it for `org.apache.flink.types.Row`. Calcite is using 
`List`, which is not very nice, but would also work.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

2017-09-14 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r13586
  
--- Diff: flink-core/pom.xml ---
@@ -80,6 +80,13 @@ under the License.


 
+   
+   
+   org.apache.commons
--- End diff --

We should not add additional dependencies to Flink just because of a new 
data type. There is also no reason behind choosing this library. Couldn't we 
not just use a usual Java Map? Otherwise I would propose to add class for our 
own type like we did it for `org.apache.flink.types.Row`. Calcite is using 
`List`, which is not very nice, but would also work.


---


  1   2   >