[jira] [Updated] (FLINK-8739) Optimize runtime support for distinct filter

2018-12-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-8739:
--
Labels: pull-request-available  (was: )

> Optimize runtime support for distinct filter
> 
>
> Key: FLINK-8739
> URL: https://issues.apache.org/jira/browse/FLINK-8739
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> Possible optimizaitons:
> 1. Decouple distinct map and actual accumulator so that they can separately 
> be created in codegen.
> 2. Reuse same distinct accumulator for filtering, e.g. `SELECT 
> COUNT(DISTINCT(a)), SUM(DISTINCT(a))` should reuse the same distinct map.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu opened a new pull request #7286: [FLINK-8739] [table] Optimize DISTINCE aggregates to use the same distinct accumulator if possible

2018-12-11 Thread GitBox
dianfu opened a new pull request #7286: [FLINK-8739] [table] Optimize DISTINCE 
aggregates to use the same distinct accumulator if possible
URL: https://github.com/apache/flink/pull/7286
 
 
   ## What is the purpose of the change
   
   *This pull request optimizes the DISTINCT aggregates such as `SELECT 
COUNT(DISTINCT(a)), SUM(DISTINCT(a))` to use the same distinct accumulator for 
performance optimization.*
   
   ## Brief change log
   
 - *Separate DistinctAccumulator and the actual accumulator*
 - *Optimize the AggregateCodeGeneration to reuse the same 
DistinctAccumulator if possible*
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
*SqlITCase#testDistinctAggOnRowTimeTumbleWindow, AggregateITCase, 
OverWindowITCase*.
   
   Still missing: For the harness test, I will add more harness tests base on 
#7253 
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8739) Optimize runtime support for distinct filter

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718569#comment-16718569
 ] 

ASF GitHub Bot commented on FLINK-8739:
---

dianfu opened a new pull request #7286: [FLINK-8739] [table] Optimize DISTINCE 
aggregates to use the same distinct accumulator if possible
URL: https://github.com/apache/flink/pull/7286
 
 
   ## What is the purpose of the change
   
   *This pull request optimizes the DISTINCT aggregates such as `SELECT 
COUNT(DISTINCT(a)), SUM(DISTINCT(a))` to use the same distinct accumulator for 
performance optimization.*
   
   ## Brief change log
   
 - *Separate DistinctAccumulator and the actual accumulator*
 - *Optimize the AggregateCodeGeneration to reuse the same 
DistinctAccumulator if possible*
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
*SqlITCase#testDistinctAggOnRowTimeTumbleWindow, AggregateITCase, 
OverWindowITCase*.
   
   Still missing: For the harness test, I will add more harness tests base on 
#7253 
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Optimize runtime support for distinct filter
> 
>
> Key: FLINK-8739
> URL: https://issues.apache.org/jira/browse/FLINK-8739
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> Possible optimizaitons:
> 1. Decouple distinct map and actual accumulator so that they can separately 
> be created in codegen.
> 2. Reuse same distinct accumulator for filtering, e.g. `SELECT 
> COUNT(DISTINCT(a)), SUM(DISTINCT(a))` should reuse the same distinct map.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8838) Add Support for UNNEST a MultiSet type field

2018-12-11 Thread lincoln.lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln.lee closed FLINK-8838.
--

> Add Support for UNNEST a MultiSet type field
> 
>
> Key: FLINK-8838
> URL: https://issues.apache.org/jira/browse/FLINK-8838
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Major
> Fix For: 1.6.0
>
>
> MultiSetTypeInfo was introduced by  FLINK-7491, and UNNEST support Array type 
> only,  so it would be nice to support UNNEST a MultiSet type field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11138) RocksDB's wal seems to be useless in rocksdb-state-backend that reduce the disk's overhead

2018-12-11 Thread cailiuyang (JIRA)
cailiuyang created FLINK-11138:
--

 Summary: RocksDB's wal seems to be useless in 
rocksdb-state-backend that reduce the disk's overhead
 Key: FLINK-11138
 URL: https://issues.apache.org/jira/browse/FLINK-11138
 Project: Flink
  Issue Type: Improvement
Reporter: cailiuyang


The wal of rocksdb is used to recovery after crash, but after flink-job crash, 
it always recovery from last checkpoint, so it seems wal is useless.

Am i right?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-5511) Add support for outer joins with local predicates

2018-12-11 Thread lincoln.lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln.lee closed FLINK-5511.
--

> Add support for outer joins with local predicates
> -
>
> Key: FLINK-5511
> URL: https://issues.apache.org/jira/browse/FLINK-5511
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Minor
>
> currently the test case in 
> flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
> will throw a ValidationException indicating: “Invalid non-join predicate 'b < 
> 3. For non-join predicates use Table#where.” 
> {code:title=JoinITCase.scala} 
> @Test(expected = classOf[ValidationException]) 
> def testNoJoinCondition(): Unit = { 
>  … 
>  val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 
> 'c) 
>  val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 
> 'f, 'g, 'h) 
>  val joinT = ds2.leftOuterJoin(ds1, 'b === 'd && 'b < 3).select('c, 'g) 
> } 
> {code} 
> This jira aims to supported this kind of local predicates in outer joins. 
> More detailed description: http://goo.gl/gK6vP3 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11137) Unexpected RegistrationTimeoutException of TaskExecutor

2018-12-11 Thread Biao Liu (JIRA)
Biao Liu created FLINK-11137:


 Summary: Unexpected RegistrationTimeoutException of TaskExecutor
 Key: FLINK-11137
 URL: https://issues.apache.org/jira/browse/FLINK-11137
 Project: Flink
  Issue Type: Bug
  Components: TaskManager
Affects Versions: 1.7.0
Reporter: Biao Liu
Assignee: Biao Liu


There is a race condition in {{TaskExecutor}} between starting registering to 
RM and checking registration timeout. Currently we start RM leader retriever 
first, and then start registration timeout checking. If registration is fast 
enough, there is a possibility that registration is finished before starting 
checking registration timeout. The timeout checking will fail later.

There is a stack trace of exception below:
{quote}2018-11-05 14:16:52,464 ERROR 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Fatal error occurred in 
TaskExecutor akka.tcp://flink@/user/taskmanager_0.
 org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: 
Could not register at the ResourceManager within the specified maximum 
registration duration 30 ms. This indicates a problem with this instance. 
Terminating now.
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1110)
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$4(TaskExecutor.java:1096)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
 at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
 at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
 at akka.actor.ActorCell.invoke(ActorCell.scala:495)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
 at akka.dispatch.Mailbox.run(Mailbox.scala:224)
{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11081) Support binding port range for REST server

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718562#comment-16718562
 ] 

ASF GitHub Bot commented on FLINK-11081:


yanghua commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240905696
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
 
 Review comment:
   OK, thank you for your clarification. It seems that I misunderstood what you 
mean. I thought you meant to capture Exception and could not catch 
IllegalConfigurationException. Now it seems that what you mean is that we 
should break down the capture of a particular exception. Considering that 
`NetUtils.getPortRangeFromString` will throw at least two possible exceptions, 
one checked and one unchecked. I think we capture Exception directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support binding port range for REST server
> --
>
> Key: FLINK-11081
> URL: https://issues.apache.org/jira/browse/FLINK-11081
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{RestServerEndpoint}} binds to the port specified by 
> {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be 
> useful to being able to specify not only a single port but a port range to 
> pick a port from. Therefore, I propose to add similar to 
> {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which 
> allows to specify a port range for the {{RestServerEndpoint}} to pick a port 
> from. {{RestOptions#PORT}} would then only be used by the client to connect 
> to the started {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server

2018-12-11 Thread GitBox
yanghua commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240905696
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
 
 Review comment:
   OK, thank you for your clarification. It seems that I misunderstood what you 
mean. I thought you meant to capture Exception and could not catch 
IllegalConfigurationException. Now it seems that what you mean is that we 
should break down the capture of a particular exception. Considering that 
`NetUtils.getPortRangeFromString` will throw at least two possible exceptions, 
one checked and one unchecked. I think we capture Exception directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun opened a new pull request #7285: [hotfix] [resource manager] Remove legacy(unused) class ResourceManag…

2018-12-11 Thread GitBox
TisonKun opened a new pull request #7285: [hotfix] [resource manager] Remove 
legacy(unused) class ResourceManag…
URL: https://github.com/apache/flink/pull/7285
 
 
   …erServices
   
   ## What is the purpose of the change
   
   We introduced `ResourceManagerService` in 
9d1b5fb7ab2b1dcb327d026d367bbd7dc3e66f94 
   
   As code evolves, its functions are covered by `ResourceActions`(for 
`allocateResource`) and `FencedRpcEndpoint`(by `getFencingToken`, 
`getMainThreadExecutor` and `runAsync*`). 
   
   Now there is no code depending on `ResourceManagerService` and its functions 
are superseded. We can safely remove it as a clean-up.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**no**)
 - The serializers: (**no**)
 - The runtime per-record code paths (performance sensitive): (**no**)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**)
 - The S3 file system connector: (**no**)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**no**)
 - If yes, how is the feature documented? (**not applicable**)
   
   cc @zentol @StefanRRichter 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11081) Support binding port range for REST server

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718528#comment-16718528
 ] 

ASF GitHub Bot commented on FLINK-11081:


walterddr commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240900279
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
 
 Review comment:
   What I meant is "should we catch `RuntimeExceptions`". function signature of 
`startActorSystem` explicitly throws a generic `Exception` so there's no way to 
distinguish in `BootstrapTool` use case. But here since we are directly using 
`NetUtil.getPor...`, we might be able to do a better job. 
   
   Regarding whether we should catch it. I am fine with either decision as long 
as they are properly documented.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support binding port range for REST server
> --
>
> Key: FLINK-11081
> URL: https://issues.apache.org/jira/browse/FLINK-11081
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{RestServerEndpoint}} binds to the port specified by 
> {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be 
> useful to being able to specify not only a single port but a port range to 
> pick a port from. Therefore, I propose to add similar to 
> {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which 
> allows to specify a port range for the {{RestServerEndpoint}} to pick a port 
> from. {{RestOptions#PORT}} would then only be used by the client to connect 
> to the started {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] walterddr commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server

2018-12-11 Thread GitBox
walterddr commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240900279
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
 
 Review comment:
   What I meant is "should we catch `RuntimeExceptions`". function signature of 
`startActorSystem` explicitly throws a generic `Exception` so there's no way to 
distinguish in `BootstrapTool` use case. But here since we are directly using 
`NetUtil.getPor...`, we might be able to do a better job. 
   
   Regarding whether we should catch it. I am fine with either decision as long 
as they are properly documented.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11067) Port TableEnvironments to Java

2018-12-11 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718513#comment-16718513
 ] 

sunjincheng commented on FLINK-11067:
-

We list the two ways users use BatchTableEnvironment, as follows:
 1.Using api.scala.BatchTableEnvironment
{code:java}
import org.apache.flink.table.api.scala.BatchTableEnvironment
BatchTableEnvironment tEnv = 
 TableEnvironment.getTableEnvironment(env);
{code}
2. api.java.BatchTableEnvironment
 BatchTableEnvironment tEnv =
{code:java}
import org.apache.flink.table.api.java.BatchTableEnvironment
BatchTableEnvironment tEnv = 
 TableEnvironment.getTableEnvironment(env);
{code}
According to current design, `flink-tabe-api-java/flink-tabe-api-scala` 
dependent the `flink-tabe-api-base`, then the `api.java.BatchTableEnvironment` 
and `api.scala.BatchTableEnvironment` can not be import in 
`flink-tabe-api-base`,So I also want to know [~dian.fu]'s question that which 
package does `BatchTableEnvironment` belong to, in `getTableEnvironment()` 
definition?
{code:java}
static BatchTableEnvironment getTableEnvironment(BatchEnvironment env);
{code}

Bests,
Jincheng


> Port TableEnvironments to Java
> --
>
> Key: FLINK-11067
> URL: https://issues.apache.org/jira/browse/FLINK-11067
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> This task includes porting {{TableEnvironment}}, {{StreamTableEnvironment}}, 
> {{BatchTableEnvironment}} to Java. API-breaking changes need to be avoided 
> and discussed. Some refactoring and clean up might be necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11107) [state] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718512#comment-16718512
 ] 

ASF GitHub Bot commented on FLINK-11107:


Myasuka edited a comment on issue #7281: [FLINK-11107][state] Avoid memory 
stateBackend to create arbitrary folders under HA path when no checkpoint path 
configured
URL: https://github.com/apache/flink/pull/7281#issuecomment-446297350
 
 
   @StephanEwen , since you have left annotations below:
   ~~~
   to keep supporting the old behavior where default (JobManager) Backend + HA 
mode = checkpoints in HA store
   we add the HA persistence dir as the checkpoint directory if none other is 
set
   ~~~
   However, I'm wondering whether this keeps the same behavior as before. For 
Flink-1.3, (JobManager) Backend + HA mode = only create `completedCheckpoint` 
file under HA folder. On the other side, for Flink-1.6, this would create 
another `job-id/chk-x/_metadata` except the completedCheckpoint file. Please 
correct me if I am wrong.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> [state] Avoid memory stateBackend to create arbitrary folders under HA path 
> when no checkpoint path configured
> --
>
> Key: FLINK-11107
> URL: https://issues.apache.org/jira/browse/FLINK-11107
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.1
>
>
> Currently, memory state-backend would create a folder named with random UUID 
> under HA directory if no checkpoint path ever configured. (the code logic 
> locates within {{StateBackendLoader#fromApplicationOrConfigOrDefault}}) 
> However, the default memory state-backend would not only be created on JM 
> side, but also on each task manager's side, which means many folders with 
> random UUID would be created under HA directory. It would result in exception 
> like:
> {noformat}
> The directory item limit of /tmp/flink/ha is exceeded: limit=1048576 
> items=1048576{noformat}
>  If this happens, no new jobs could be submitted only if we clean up those 
> directories manually.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Myasuka edited a comment on issue #7281: [FLINK-11107][state] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured

2018-12-11 Thread GitBox
Myasuka edited a comment on issue #7281: [FLINK-11107][state] Avoid memory 
stateBackend to create arbitrary folders under HA path when no checkpoint path 
configured
URL: https://github.com/apache/flink/pull/7281#issuecomment-446297350
 
 
   @StephanEwen , since you have left annotations below:
   ~~~
   to keep supporting the old behavior where default (JobManager) Backend + HA 
mode = checkpoints in HA store
   we add the HA persistence dir as the checkpoint directory if none other is 
set
   ~~~
   However, I'm wondering whether this keeps the same behavior as before. For 
Flink-1.3, (JobManager) Backend + HA mode = only create `completedCheckpoint` 
file under HA folder. On the other side, for Flink-1.6, this would create 
another `job-id/chk-x/_metadata` except the completedCheckpoint file. Please 
correct me if I am wrong.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11107) [state] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718511#comment-16718511
 ] 

ASF GitHub Bot commented on FLINK-11107:


Myasuka commented on issue #7281: [FLINK-11107][state] Avoid memory 
stateBackend to create arbitrary folders under HA path when no checkpoint path 
configured
URL: https://github.com/apache/flink/pull/7281#issuecomment-446478434
 
 
   BTW, If we do not create UUID directories for memory state-backend in this 
situation, job could still restore from high-availability storage. The only 
difference is the information of `Latest Restore` under `Checkpoints` tab of 
web UI would show the path is ``.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> [state] Avoid memory stateBackend to create arbitrary folders under HA path 
> when no checkpoint path configured
> --
>
> Key: FLINK-11107
> URL: https://issues.apache.org/jira/browse/FLINK-11107
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.1
>
>
> Currently, memory state-backend would create a folder named with random UUID 
> under HA directory if no checkpoint path ever configured. (the code logic 
> locates within {{StateBackendLoader#fromApplicationOrConfigOrDefault}}) 
> However, the default memory state-backend would not only be created on JM 
> side, but also on each task manager's side, which means many folders with 
> random UUID would be created under HA directory. It would result in exception 
> like:
> {noformat}
> The directory item limit of /tmp/flink/ha is exceeded: limit=1048576 
> items=1048576{noformat}
>  If this happens, no new jobs could be submitted only if we clean up those 
> directories manually.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Myasuka commented on issue #7281: [FLINK-11107][state] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured

2018-12-11 Thread GitBox
Myasuka commented on issue #7281: [FLINK-11107][state] Avoid memory 
stateBackend to create arbitrary folders under HA path when no checkpoint path 
configured
URL: https://github.com/apache/flink/pull/7281#issuecomment-446478434
 
 
   BTW, If we do not create UUID directories for memory state-backend in this 
situation, job could still restore from high-availability storage. The only 
difference is the information of `Latest Restore` under `Checkpoints` tab of 
web UI would show the path is ``.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-4810) Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints

2018-12-11 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-4810:
---

Assignee: vinoyang

> Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful 
> checkpoints
> 
>
> Key: FLINK-4810
> URL: https://issues.apache.org/jira/browse/FLINK-4810
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The Checkpoint coordinator should track the number of consecutive 
> unsuccessful checkpoints.
> If more than {{n}} (configured value) checkpoints fail in a row, it should 
> call {{fail()}} on the execution graph to trigger a recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] ramkrish86 commented on issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints

2018-12-11 Thread GitBox
ramkrish86 commented on issue #3334: FLINK-4810 Checkpoint Coordinator should 
fail ExecutionGraph after "n" unsuccessful checkpoints
URL: https://github.com/apache/flink/pull/3334#issuecomment-446471220
 
 
   Closing the PR as per request.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-4810) Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718490#comment-16718490
 ] 

ASF GitHub Bot commented on FLINK-4810:
---

ramkrish86 commented on issue #3334: FLINK-4810 Checkpoint Coordinator should 
fail ExecutionGraph after "n" unsuccessful checkpoints
URL: https://github.com/apache/flink/pull/3334#issuecomment-446471220
 
 
   Closing the PR as per request.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful 
> checkpoints
> 
>
> Key: FLINK-4810
> URL: https://issues.apache.org/jira/browse/FLINK-4810
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
>
> The Checkpoint coordinator should track the number of consecutive 
> unsuccessful checkpoints.
> If more than {{n}} (configured value) checkpoints fail in a row, it should 
> call {{fail()}} on the execution graph to trigger a recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] ramkrish86 closed pull request #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints

2018-12-11 Thread GitBox
ramkrish86 closed pull request #3334: FLINK-4810 Checkpoint Coordinator should 
fail ExecutionGraph after "n" unsuccessful checkpoints
URL: https://github.com/apache/flink/pull/3334
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0592e3d9aea..9f453d0f2c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -132,6 +132,8 @@
 
/** The maximum number of checkpoints that may be in progress at the 
same time */
private final int maxConcurrentCheckpointAttempts;
+   /** The maximum number of unsuccessful checkpoints */
+   private final int maxFailedCheckpoints;
 
/** The timer that handles the checkpoint timeouts and triggers 
periodic checkpoints */
private final Timer timer;
@@ -142,6 +144,9 @@
/** The number of consecutive failed trigger attempts */
private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new 
AtomicInteger(0);
 
+   /** The number of consecutive failed checkpoints */
+   private final AtomicInteger numFailedCheckpoints = new AtomicInteger(0);
+
private ScheduledTrigger currentPeriodicTrigger;
 
/** The timestamp (via {@link System#nanoTime()}) when the last 
checkpoint completed */
@@ -163,6 +168,23 @@
private CheckpointStatsTracker statsTracker;
 
// 

+   public CheckpointCoordinator(
+   JobID job,
+   long baseInterval,
+   long checkpointTimeout,
+   long minPauseBetweenCheckpoints,
+   int maxConcurrentCheckpointAttempts,
+   ExternalizedCheckpointSettings externalizeSettings,
+   ExecutionVertex[] tasksToTrigger,
+   ExecutionVertex[] tasksToWaitFor,
+   ExecutionVertex[] tasksToCommitTo,
+   CheckpointIDCounter checkpointIDCounter,
+   CompletedCheckpointStore completedCheckpointStore,
+   String checkpointDirectory,
+   Executor executor) {
+   this(job, baseInterval, checkpointTimeout, 
minPauseBetweenCheckpoints, maxConcurrentCheckpointAttempts, 0, 
externalizeSettings, tasksToTrigger, tasksToWaitFor, tasksToCommitTo,
+   checkpointIDCounter, completedCheckpointStore, 
checkpointDirectory, executor);
+   }
 
public CheckpointCoordinator(
JobID job,
@@ -170,6 +192,7 @@ public CheckpointCoordinator(
long checkpointTimeout,
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpointAttempts,
+   int maxFailedCheckpoints,
ExternalizedCheckpointSettings externalizeSettings,
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
@@ -184,6 +207,7 @@ public CheckpointCoordinator(
checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must 
be larger than zero");
checkArgument(minPauseBetweenCheckpoints >= 0, 
"minPauseBetweenCheckpoints must be >= 0");
checkArgument(maxConcurrentCheckpointAttempts >= 1, 
"maxConcurrentCheckpointAttempts must be >= 1");
+   checkArgument(maxFailedCheckpoints >= 0, "maxFailedCheckpoints 
must be >= 0");
 
if (externalizeSettings.externalizeCheckpoints() && 
checkpointDirectory == null) {
throw new IllegalStateException("CheckpointConfig says 
to persist periodic " +
@@ -207,6 +231,7 @@ public CheckpointCoordinator(
this.checkpointTimeout = checkpointTimeout;
this.minPauseBetweenCheckpointsNanos = 
minPauseBetweenCheckpoints * 1_000_000;
this.maxConcurrentCheckpointAttempts = 
maxConcurrentCheckpointAttempts;
+   this.maxFailedCheckpoints = maxFailedCheckpoints;
this.tasksToTrigger = checkNotNull(tasksToTrigger);
this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
@@ -461,6 +486,9 @@ CheckpointTriggerResult triggerCheckpoint(
catch (Throwable t) {
int numUnsuccessful = 
numUnsuccessfulCheckpointsTriggers.incrementAndGet();
LOG.warn("Failed to 

[jira] [Commented] (FLINK-4810) Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718491#comment-16718491
 ] 

ASF GitHub Bot commented on FLINK-4810:
---

ramkrish86 closed pull request #3334: FLINK-4810 Checkpoint Coordinator should 
fail ExecutionGraph after "n" unsuccessful checkpoints
URL: https://github.com/apache/flink/pull/3334
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0592e3d9aea..9f453d0f2c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -132,6 +132,8 @@
 
/** The maximum number of checkpoints that may be in progress at the 
same time */
private final int maxConcurrentCheckpointAttempts;
+   /** The maximum number of unsuccessful checkpoints */
+   private final int maxFailedCheckpoints;
 
/** The timer that handles the checkpoint timeouts and triggers 
periodic checkpoints */
private final Timer timer;
@@ -142,6 +144,9 @@
/** The number of consecutive failed trigger attempts */
private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new 
AtomicInteger(0);
 
+   /** The number of consecutive failed checkpoints */
+   private final AtomicInteger numFailedCheckpoints = new AtomicInteger(0);
+
private ScheduledTrigger currentPeriodicTrigger;
 
/** The timestamp (via {@link System#nanoTime()}) when the last 
checkpoint completed */
@@ -163,6 +168,23 @@
private CheckpointStatsTracker statsTracker;
 
// 

+   public CheckpointCoordinator(
+   JobID job,
+   long baseInterval,
+   long checkpointTimeout,
+   long minPauseBetweenCheckpoints,
+   int maxConcurrentCheckpointAttempts,
+   ExternalizedCheckpointSettings externalizeSettings,
+   ExecutionVertex[] tasksToTrigger,
+   ExecutionVertex[] tasksToWaitFor,
+   ExecutionVertex[] tasksToCommitTo,
+   CheckpointIDCounter checkpointIDCounter,
+   CompletedCheckpointStore completedCheckpointStore,
+   String checkpointDirectory,
+   Executor executor) {
+   this(job, baseInterval, checkpointTimeout, 
minPauseBetweenCheckpoints, maxConcurrentCheckpointAttempts, 0, 
externalizeSettings, tasksToTrigger, tasksToWaitFor, tasksToCommitTo,
+   checkpointIDCounter, completedCheckpointStore, 
checkpointDirectory, executor);
+   }
 
public CheckpointCoordinator(
JobID job,
@@ -170,6 +192,7 @@ public CheckpointCoordinator(
long checkpointTimeout,
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpointAttempts,
+   int maxFailedCheckpoints,
ExternalizedCheckpointSettings externalizeSettings,
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
@@ -184,6 +207,7 @@ public CheckpointCoordinator(
checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must 
be larger than zero");
checkArgument(minPauseBetweenCheckpoints >= 0, 
"minPauseBetweenCheckpoints must be >= 0");
checkArgument(maxConcurrentCheckpointAttempts >= 1, 
"maxConcurrentCheckpointAttempts must be >= 1");
+   checkArgument(maxFailedCheckpoints >= 0, "maxFailedCheckpoints 
must be >= 0");
 
if (externalizeSettings.externalizeCheckpoints() && 
checkpointDirectory == null) {
throw new IllegalStateException("CheckpointConfig says 
to persist periodic " +
@@ -207,6 +231,7 @@ public CheckpointCoordinator(
this.checkpointTimeout = checkpointTimeout;
this.minPauseBetweenCheckpointsNanos = 
minPauseBetweenCheckpoints * 1_000_000;
this.maxConcurrentCheckpointAttempts = 
maxConcurrentCheckpointAttempts;
+   this.maxFailedCheckpoints = maxFailedCheckpoints;
this.tasksToTrigger = checkNotNull(tasksToTrigger);
this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
@@ -461,6 +486,9 @@ 

[GitHub] lamber-ken removed a comment on issue #7279: [hotfix] add hadoop user name to yarn logs command

2018-12-11 Thread GitBox
lamber-ken removed a comment on issue #7279: [hotfix] add hadoop user name to 
yarn logs command
URL: https://github.com/apache/flink/pull/7279#issuecomment-446261251
 
 
   hi, @zentol, cc  :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718478#comment-16718478
 ] 

ASF GitHub Bot commented on FLINK-11048:


tweise commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r240885619
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,33 +173,66 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   /**
+* Set savepoint restore settings that will be used when executing the 
job.
+* @param savepointRestoreSettings savepoint restore settings
+*/
+   public void setSavepointRestoreSettings(SavepointRestoreSettings 
savepointRestoreSettings) {
+   this.savepointRestoreSettings = savepointRestoreSettings;
 
 Review comment:
   Other than the job name all other execution parameters come from the 
"environment" already. To me this is not a big deal as long as we don't 
introduce even more clutter than there already is. Added another execute method 
instead. PTAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ability to programmatically execute streaming pipeline with savepoint restore 
>  
> ---
>
> Key: FLINK-11048
> URL: https://issues.apache.org/jira/browse/FLINK-11048
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>  Labels: pull-request-available
>
> RemoteStreamEnvironment.execute doesn't support restore from savepoint, 
> though the underlying ClusterClient does. Add an explicit "execute remotely" 
> that can be used by downstream projects.
> [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-11 Thread GitBox
tweise commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r240885619
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,33 +173,66 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   /**
+* Set savepoint restore settings that will be used when executing the 
job.
+* @param savepointRestoreSettings savepoint restore settings
+*/
+   public void setSavepointRestoreSettings(SavepointRestoreSettings 
savepointRestoreSettings) {
+   this.savepointRestoreSettings = savepointRestoreSettings;
 
 Review comment:
   Other than the job name all other execution parameters come from the 
"environment" already. To me this is not a big deal as long as we don't 
introduce even more clutter than there already is. Added another execute method 
instead. PTAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11081) Support binding port range for REST server

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718466#comment-16718466
 ] 

ASF GitHub Bot commented on FLINK-11081:


yanghua commented on issue #7263: [FLINK-11081] Support binding port range for 
REST server
URL: https://github.com/apache/flink/pull/7263#issuecomment-446460550
 
 
   `ConfigOption.PORT` ? I can not find it. 
   I guess what you want to express is `RestOptions.PORT`. I think that it 
can't be deprecated here. Initially its responsibilities are both the binding 
port of the server and the connection port of the client. Now the server's 
binding port is represented by `BIND_PORT`, then it can also be used to 
represent the client's connection port. In some places it may be used:
   
   ```
   Private void createClientConfiguration(URI restAddress) {
   Configuration restClientConfig = new Configuration();
   restClientConfig.setString(JobManagerOptions.ADDRESS, restAddress.getHost());
   restClientConfig.setInteger(RestOptions.PORT, restAddress.getPort());
   this.restClusterClientConfig = new 
UnmodifiableConfiguration(restClientConfig);
   }
   ```
   
   There should be two pairs of configuration items `BIND_ADDRESS/BIND_PORT` 
and `ADDRESS/PORT` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support binding port range for REST server
> --
>
> Key: FLINK-11081
> URL: https://issues.apache.org/jira/browse/FLINK-11081
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{RestServerEndpoint}} binds to the port specified by 
> {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be 
> useful to being able to specify not only a single port but a port range to 
> pick a port from. Therefore, I propose to add similar to 
> {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which 
> allows to specify a port range for the {{RestServerEndpoint}} to pick a port 
> from. {{RestOptions#PORT}} would then only be used by the client to connect 
> to the started {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11081) Support binding port range for REST server

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718467#comment-16718467
 ] 

ASF GitHub Bot commented on FLINK-11081:


yanghua edited a comment on issue #7263: [FLINK-11081] Support binding port 
range for REST server
URL: https://github.com/apache/flink/pull/7263#issuecomment-446460550
 
 
   `ConfigOption.PORT` ? I can not find it. 
   I guess what you want to express is `RestOptions.PORT`. I think that it 
can't be deprecated here. Initially its responsibilities are both the binding 
port of the server and the connection port of the client. Now the server's 
binding port is represented by `BIND_PORT`, then it can also be used to 
represent the client's connection port. In some places it may be used:
   
   ```java
   Private void createClientConfiguration(URI restAddress) {
   Configuration restClientConfig = new Configuration();
   restClientConfig.setString(JobManagerOptions.ADDRESS, 
restAddress.getHost());
   restClientConfig.setInteger(RestOptions.PORT, restAddress.getPort());
   this.restClusterClientConfig = new 
UnmodifiableConfiguration(restClientConfig);
   }
   ```
   
   There should be two pairs of configuration items `BIND_ADDRESS/BIND_PORT` 
and `ADDRESS/PORT` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support binding port range for REST server
> --
>
> Key: FLINK-11081
> URL: https://issues.apache.org/jira/browse/FLINK-11081
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{RestServerEndpoint}} binds to the port specified by 
> {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be 
> useful to being able to specify not only a single port but a port range to 
> pick a port from. Therefore, I propose to add similar to 
> {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which 
> allows to specify a port range for the {{RestServerEndpoint}} to pick a port 
> from. {{RestOptions#PORT}} would then only be used by the client to connect 
> to the started {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua edited a comment on issue #7263: [FLINK-11081] Support binding port range for REST server

2018-12-11 Thread GitBox
yanghua edited a comment on issue #7263: [FLINK-11081] Support binding port 
range for REST server
URL: https://github.com/apache/flink/pull/7263#issuecomment-446460550
 
 
   `ConfigOption.PORT` ? I can not find it. 
   I guess what you want to express is `RestOptions.PORT`. I think that it 
can't be deprecated here. Initially its responsibilities are both the binding 
port of the server and the connection port of the client. Now the server's 
binding port is represented by `BIND_PORT`, then it can also be used to 
represent the client's connection port. In some places it may be used:
   
   ```java
   Private void createClientConfiguration(URI restAddress) {
   Configuration restClientConfig = new Configuration();
   restClientConfig.setString(JobManagerOptions.ADDRESS, 
restAddress.getHost());
   restClientConfig.setInteger(RestOptions.PORT, restAddress.getPort());
   this.restClusterClientConfig = new 
UnmodifiableConfiguration(restClientConfig);
   }
   ```
   
   There should be two pairs of configuration items `BIND_ADDRESS/BIND_PORT` 
and `ADDRESS/PORT` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #7263: [FLINK-11081] Support binding port range for REST server

2018-12-11 Thread GitBox
yanghua commented on issue #7263: [FLINK-11081] Support binding port range for 
REST server
URL: https://github.com/apache/flink/pull/7263#issuecomment-446460550
 
 
   `ConfigOption.PORT` ? I can not find it. 
   I guess what you want to express is `RestOptions.PORT`. I think that it 
can't be deprecated here. Initially its responsibilities are both the binding 
port of the server and the connection port of the client. Now the server's 
binding port is represented by `BIND_PORT`, then it can also be used to 
represent the client's connection port. In some places it may be used:
   
   ```
   Private void createClientConfiguration(URI restAddress) {
   Configuration restClientConfig = new Configuration();
   restClientConfig.setString(JobManagerOptions.ADDRESS, restAddress.getHost());
   restClientConfig.setInteger(RestOptions.PORT, restAddress.getPort());
   this.restClusterClientConfig = new 
UnmodifiableConfiguration(restClientConfig);
   }
   ```
   
   There should be two pairs of configuration items `BIND_ADDRESS/BIND_PORT` 
and `ADDRESS/PORT` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11081) Support binding port range for REST server

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718464#comment-16718464
 ] 

ASF GitHub Bot commented on FLINK-11081:


yanghua commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240880017
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
+   throw new IllegalArgumentException("Invalid 
port range definition: " + restBindPort);
+   }
+
+   int chosenPort = 0;
+   while (portsIterator.hasNext()) {
+   try {
+   chosenPort = portsIterator.next();
+   if (restBindAddress == null) {
+   channel = 
bootstrap.bind(chosenPort);
+   } else {
+   channel = 
bootstrap.bind(restBindAddress, chosenPort);
+   }
+   serverChannel = 
channel.syncUninterruptibly().channel();
+   break;
+   } catch (Exception e) {
+   // we can continue to try if this 
contains a netty channel exception
+   Throwable cause = e.getCause();
+   if (!(cause instanceof 
org.jboss.netty.channel.ChannelException ||
+   cause instanceof 
java.net.BindException)) {
+   throw e;
 
 Review comment:
   Yes, this expression may be misunderstood, I will update it later. Regarding 
the exception handling logic here, in the recent test, it is really a problem, 
we should not call it : 
   
   ```
   Throwable cause = e.getCause();
   ```
   
   It will get a `null` value. I will fix it later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support binding port range for REST server
> --
>
> Key: FLINK-11081
> URL: https://issues.apache.org/jira/browse/FLINK-11081
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{RestServerEndpoint}} binds to the port specified by 
> {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be 
> useful to being able to specify not only a single port but a port range to 
> pick a port from. Therefore, I propose to add similar to 
> {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which 
> allows to specify a port range for the {{RestServerEndpoint}} to pick a port 
> from. {{RestOptions#PORT}} would then only be used by the client to connect 
> to the started {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11081) Support binding port range for REST server

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718463#comment-16718463
 ] 

ASF GitHub Bot commented on FLINK-11081:


yanghua commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240880017
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
+   throw new IllegalArgumentException("Invalid 
port range definition: " + restBindPort);
+   }
+
+   int chosenPort = 0;
+   while (portsIterator.hasNext()) {
+   try {
+   chosenPort = portsIterator.next();
+   if (restBindAddress == null) {
+   channel = 
bootstrap.bind(chosenPort);
+   } else {
+   channel = 
bootstrap.bind(restBindAddress, chosenPort);
+   }
+   serverChannel = 
channel.syncUninterruptibly().channel();
+   break;
+   } catch (Exception e) {
+   // we can continue to try if this 
contains a netty channel exception
+   Throwable cause = e.getCause();
+   if (!(cause instanceof 
org.jboss.netty.channel.ChannelException ||
+   cause instanceof 
java.net.BindException)) {
+   throw e;
 
 Review comment:
   Yes, this expression may be misunderstood, I will update it later. Regarding 
the exception handling logic here, in the recent test, it is really a problem, 
we should not call it again.
   
   ```
   Throwable cause = e.getCause();
   ```
   
   It will get a `null` value. I will fix it later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support binding port range for REST server
> --
>
> Key: FLINK-11081
> URL: https://issues.apache.org/jira/browse/FLINK-11081
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{RestServerEndpoint}} binds to the port specified by 
> {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be 
> useful to being able to specify not only a single port but a port range to 
> pick a port from. Therefore, I propose to add similar to 
> {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which 
> allows to specify a port range for the {{RestServerEndpoint}} to pick a port 
> from. {{RestOptions#PORT}} would then only be used by the client to connect 
> to the started {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11081) Support binding port range for REST server

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718462#comment-16718462
 ] 

ASF GitHub Bot commented on FLINK-11081:


yanghua commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240880004
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
 
 Review comment:
   Here, we catch `Exception` so that regardless of `NumberFormatException` or 
`IllegalConfigurationException` will be caught, isn't it? Or did I 
misunderstand what you mean? Do you want me to catch a more specific exception?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support binding port range for REST server
> --
>
> Key: FLINK-11081
> URL: https://issues.apache.org/jira/browse/FLINK-11081
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{RestServerEndpoint}} binds to the port specified by 
> {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be 
> useful to being able to specify not only a single port but a port range to 
> pick a port from. Therefore, I propose to add similar to 
> {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which 
> allows to specify a port range for the {{RestServerEndpoint}} to pick a port 
> from. {{RestOptions#PORT}} would then only be used by the client to connect 
> to the started {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server

2018-12-11 Thread GitBox
yanghua commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240880017
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
+   throw new IllegalArgumentException("Invalid 
port range definition: " + restBindPort);
+   }
+
+   int chosenPort = 0;
+   while (portsIterator.hasNext()) {
+   try {
+   chosenPort = portsIterator.next();
+   if (restBindAddress == null) {
+   channel = 
bootstrap.bind(chosenPort);
+   } else {
+   channel = 
bootstrap.bind(restBindAddress, chosenPort);
+   }
+   serverChannel = 
channel.syncUninterruptibly().channel();
+   break;
+   } catch (Exception e) {
+   // we can continue to try if this 
contains a netty channel exception
+   Throwable cause = e.getCause();
+   if (!(cause instanceof 
org.jboss.netty.channel.ChannelException ||
+   cause instanceof 
java.net.BindException)) {
+   throw e;
 
 Review comment:
   Yes, this expression may be misunderstood, I will update it later. Regarding 
the exception handling logic here, in the recent test, it is really a problem, 
we should not call it : 
   
   ```
   Throwable cause = e.getCause();
   ```
   
   It will get a `null` value. I will fix it later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server

2018-12-11 Thread GitBox
yanghua commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240880017
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
+   throw new IllegalArgumentException("Invalid 
port range definition: " + restBindPort);
+   }
+
+   int chosenPort = 0;
+   while (portsIterator.hasNext()) {
+   try {
+   chosenPort = portsIterator.next();
+   if (restBindAddress == null) {
+   channel = 
bootstrap.bind(chosenPort);
+   } else {
+   channel = 
bootstrap.bind(restBindAddress, chosenPort);
+   }
+   serverChannel = 
channel.syncUninterruptibly().channel();
+   break;
+   } catch (Exception e) {
+   // we can continue to try if this 
contains a netty channel exception
+   Throwable cause = e.getCause();
+   if (!(cause instanceof 
org.jboss.netty.channel.ChannelException ||
+   cause instanceof 
java.net.BindException)) {
+   throw e;
 
 Review comment:
   Yes, this expression may be misunderstood, I will update it later. Regarding 
the exception handling logic here, in the recent test, it is really a problem, 
we should not call it again.
   
   ```
   Throwable cause = e.getCause();
   ```
   
   It will get a `null` value. I will fix it later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server

2018-12-11 Thread GitBox
yanghua commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240880004
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
 
 Review comment:
   Here, we catch `Exception` so that regardless of `NumberFormatException` or 
`IllegalConfigurationException` will be caught, isn't it? Or did I 
misunderstand what you mean? Do you want me to catch a more specific exception?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718447#comment-16718447
 ] 

ASF GitHub Bot commented on FLINK-10976:


hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877554
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -1682,6 +1682,36 @@ The `OverWindow` defines a range of rows over which 
aggregates are computed. `Ov
 
 {% top %}
 
+### Aggregate
+
+Aggregate performs an aggregate operation with an aggregate function. You have 
to close the
+"aggregate" with a select statement. The output will be flattened if the 
output type is a
 
 Review comment:
   Add a sentence that can't use an aggregate function in the select statement 
as it has been checked in the select. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Aggregate operator to Table API
> ---
>
> Key: FLINK-10976
> URL: https://issues.apache.org/jira/browse/FLINK-10976
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add Aggregate operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>  .groupBy('a) // leave out groupBy-clause to define global aggregates
>  .agg(fun: AggregateFunction)  // output has columns 'a, 'b, 'c
>  .select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718457#comment-16718457
 ] 

ASF GitHub Bot commented on FLINK-11010:


walterddr commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp 
is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-446457485
 
 
   > @walterddr could you please check the timezone problem when dealing with 
eventtime?
   > it seems that lamber-ken's commit only fixed proc time.
   > Would you please refer to my previous comment to take a look at my 
eventtime usecase. thanks a lot!
   > --update
   > i found the correct timestamp was changed in OutputRowtimeProcessFunction, 
using SqlFunctions.internalToTimestamp(), i'm not sure whether its redundant or 
not.
   > please fix this problem. thx
   
   FLINK-11010 only reports bugs for utilizing `currentProcessingTime()`. maybe 
we can mark the JIRA as duplicate of FLINK-8353 and focus on addressing the 
bigger overall timezone problem instead. what do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0, 1.8.0, 1.7.1
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] walterddr commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-11 Thread GitBox
walterddr commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp 
is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-446457485
 
 
   > @walterddr could you please check the timezone problem when dealing with 
eventtime?
   > it seems that lamber-ken's commit only fixed proc time.
   > Would you please refer to my previous comment to take a look at my 
eventtime usecase. thanks a lot!
   > --update
   > i found the correct timestamp was changed in OutputRowtimeProcessFunction, 
using SqlFunctions.internalToTimestamp(), i'm not sure whether its redundant or 
not.
   > please fix this problem. thx
   
   FLINK-11010 only reports bugs for utilizing `currentProcessingTime()`. maybe 
we can mark the JIRA as duplicate of FLINK-8353 and focus on addressing the 
bigger overall timezone problem instead. what do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11081) Support binding port range for REST server

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718456#comment-16718456
 ] 

ASF GitHub Bot commented on FLINK-11081:


walterddr commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240877630
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
 
 Review comment:
   hmm. `IllegalConfigurationException` extends `RuntimeException`, shouldn't 
we not catch that? I am wondering reason `BootstrapTool` handles it that way 
was because the static `startActorSystem` method throws general exception in 
method signature.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support binding port range for REST server
> --
>
> Key: FLINK-11081
> URL: https://issues.apache.org/jira/browse/FLINK-11081
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{RestServerEndpoint}} binds to the port specified by 
> {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be 
> useful to being able to specify not only a single port but a port range to 
> pick a port from. Therefore, I propose to add similar to 
> {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which 
> allows to specify a port range for the {{RestServerEndpoint}} to pick a port 
> from. {{RestOptions#PORT}} would then only be used by the client to connect 
> to the started {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11081) Support binding port range for REST server

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718455#comment-16718455
 ] 

ASF GitHub Bot commented on FLINK-11081:


walterddr commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240877872
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
+   throw new IllegalArgumentException("Invalid 
port range definition: " + restBindPort);
+   }
+
+   int chosenPort = 0;
+   while (portsIterator.hasNext()) {
+   try {
+   chosenPort = portsIterator.next();
+   if (restBindAddress == null) {
+   channel = 
bootstrap.bind(chosenPort);
+   } else {
+   channel = 
bootstrap.bind(restBindAddress, chosenPort);
+   }
+   serverChannel = 
channel.syncUninterruptibly().channel();
+   break;
+   } catch (Exception e) {
+   // we can continue to try if this 
contains a netty channel exception
+   Throwable cause = e.getCause();
+   if (!(cause instanceof 
org.jboss.netty.channel.ChannelException ||
+   cause instanceof 
java.net.BindException)) {
+   throw e;
 
 Review comment:
   you are right. yeah I think both place the comment is a bit misleading. My 
take from the comment is: "we can continue to retry the same port within this 
while loop" instead of the actual meaning: "we 'could allow' users to continue 
try when we throw an exception to the user"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support binding port range for REST server
> --
>
> Key: FLINK-11081
> URL: https://issues.apache.org/jira/browse/FLINK-11081
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{RestServerEndpoint}} binds to the port specified by 
> {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be 
> useful to being able to specify not only a single port but a port range to 
> pick a port from. Therefore, I propose to add similar to 
> {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which 
> allows to specify a port range for the {{RestServerEndpoint}} to pick a port 
> from. {{RestOptions#PORT}} would then only be used by the client to connect 
> to the started {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] walterddr commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server

2018-12-11 Thread GitBox
walterddr commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240877872
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
+   throw new IllegalArgumentException("Invalid 
port range definition: " + restBindPort);
+   }
+
+   int chosenPort = 0;
+   while (portsIterator.hasNext()) {
+   try {
+   chosenPort = portsIterator.next();
+   if (restBindAddress == null) {
+   channel = 
bootstrap.bind(chosenPort);
+   } else {
+   channel = 
bootstrap.bind(restBindAddress, chosenPort);
+   }
+   serverChannel = 
channel.syncUninterruptibly().channel();
+   break;
+   } catch (Exception e) {
+   // we can continue to try if this 
contains a netty channel exception
+   Throwable cause = e.getCause();
+   if (!(cause instanceof 
org.jboss.netty.channel.ChannelException ||
+   cause instanceof 
java.net.BindException)) {
+   throw e;
 
 Review comment:
   you are right. yeah I think both place the comment is a bit misleading. My 
take from the comment is: "we can continue to retry the same port within this 
while loop" instead of the actual meaning: "we 'could allow' users to continue 
try when we throw an exception to the user"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server

2018-12-11 Thread GitBox
walterddr commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240877630
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
 
 Review comment:
   hmm. `IllegalConfigurationException` extends `RuntimeException`, shouldn't 
we not catch that? I am wondering reason `BootstrapTool` handles it that way 
was because the static `startActorSystem` method throws general exception in 
method signature.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718450#comment-16718450
 ] 

ASF GitHub Bot commented on FLINK-10976:


hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877583
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -999,6 +999,47 @@ class Table(
 new OverWindowedTable(this, overWindows.toArray)
   }
 
+  /**
+* Performs a global aggregate operation with an aggregate function. Use 
this before a selection
+* to perform the selection operation. The output will be flattened if the 
output type is a
+* composite type.
+*
+* Example:
+*
+* {{{
+*   val aggFunc: AggregateFunction[_, _] = new MyAggregateFunction
+*   tableEnv.registerFunction("aggFunc", aggFunc);
+*   table.aggregate("aggFunc(a, b) as (f0, f1, f2)").select('f0, 'f1)
+* }}}
+*/
+  def aggregate(aggregateFunction: Expression): AggregatedTable = {
+unwrap(aggregateFunction, tableEnv) match {
+  case _: Aggregation =>
+  case _ => throw new ValidationException("Only AggregateFunction can be 
used in aggregate.")
+}
+
+new AggregatedTable(this, Nil, aggregateFunction)
+  }
+
+  /**
+* Performs a global aggregate operation with an aggregate function. Use 
this before a selection
+* to perform the selection operation. The output will be flattened if the 
output type is a
+* composite type.
+*
+* Example:
+*
+* {{{
+*   val aggFunc: AggregateFunction[_, _] = new MyAggregateFunction
+*   tableEnv.registerFunction("aggFunc", aggFunc);
+*   table.aggregate("aggFunc(a, b) as (f0, f1, f2)").select("f0, f1")
+* }}}
+*/
+  def aggregate(aggregateFunction: String): AggregatedTable = {
+val withResolvedAggFunctionCall = replaceAggFunctionCall(
 
 Review comment:
   Replace with `groupBy().aggregate(aggregateFunction)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Aggregate operator to Table API
> ---
>
> Key: FLINK-10976
> URL: https://issues.apache.org/jira/browse/FLINK-10976
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add Aggregate operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>  .groupBy('a) // leave out groupBy-clause to define global aggregates
>  .agg(fun: AggregateFunction)  // output has columns 'a, 'b, 'c
>  .select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718446#comment-16718446
 ] 

ASF GitHub Bot commented on FLINK-10976:


hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877612
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ##
 @@ -418,4 +419,34 @@ object ProjectionTranslator {
   case e: Expression => e
 }
   }
+
+  /**
+* Unwrap a Call to ScalarFunctionCall, TableFunctionCall, AggFunctionCall 
or the built-in
+* expression representation.
+*
+* @param fieldthe expression to unwrap
+* @param tableEnv the TableEnvironment
+* @return the unwrapped expression
+*/
+  def unwrap(
+  field: Expression,
+  tableEnv: TableEnvironment): Expression = {
+field match {
+  case Alias(child, _, _) =>
+unwrap(child, tableEnv)
+  // Functions calls
+  case c @ Call(name, args: Seq[Expression]) =>
+val function = tableEnv.functionCatalog.lookupFunction(name, args)
+unwrap(function, tableEnv)
+  // Other expressions
+  case e: Expression => e
+}
+  }
+
+  def extractFieldNames(expr: Expression): Seq[String] = {
+expr match {
+  case Alias(child, name, extraNames) => Seq(name) ++ extraNames
 
 Review comment:
   replace child with _


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Aggregate operator to Table API
> ---
>
> Key: FLINK-10976
> URL: https://issues.apache.org/jira/browse/FLINK-10976
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add Aggregate operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>  .groupBy('a) // leave out groupBy-clause to define global aggregates
>  .agg(fun: AggregateFunction)  // output has columns 'a, 'b, 'c
>  .select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718453#comment-16718453
 ] 

ASF GitHub Bot commented on FLINK-10976:


hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877640
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
 ##
 @@ -128,4 +128,71 @@ class AggregateStringExpressionTest extends TableTestBase 
{
 
 verifyTableEquals(resJava, resScala)
   }
+
+  @Test
+  def testNonGroupedTableAggregate(): Unit = {
+val util = streamTestUtil()
+val t = util.addTable[(Int, Long, String)]('a, 'b, 'c)
+
+val testAgg = new CountMinMax
+util.tableEnv.registerFunction("testAgg", testAgg)
+
+// Expression / Scala API
+val resScala = t
+  .aggregate(testAgg('a))
+  .select('f0, 'f1)
+
+// String / Java API
+val resJava = t
+  .aggregate("testAgg(a)")
+  .select("f0, f1")
+
+verifyTableEquals(resScala, resJava)
+  }
+
+  @Test
+  def testTableAggregate(): Unit = {
 
 Review comment:
   How about renaming it to `testAggregate`. To distinguish from the todo 
`flatAggregate`, `TableAggregateFunction`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Aggregate operator to Table API
> ---
>
> Key: FLINK-10976
> URL: https://issues.apache.org/jira/browse/FLINK-10976
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add Aggregate operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>  .groupBy('a) // leave out groupBy-clause to define global aggregates
>  .agg(fun: AggregateFunction)  // output has columns 'a, 'b, 'c
>  .select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718452#comment-16718452
 ] 

ASF GitHub Bot commented on FLINK-10976:


hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877626
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/AggregateValidationTest.scala
 ##
 @@ -208,4 +208,58 @@ class AggregateValidationTest extends TableTestBase {
 // must fail. UDAGG does not accept String type
 .select("myWeightedAvg(c, a)")
   }
+
 
 Review comment:
   Add a test to validate aggregate with two aggregate functions?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Aggregate operator to Table API
> ---
>
> Key: FLINK-10976
> URL: https://issues.apache.org/jira/browse/FLINK-10976
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add Aggregate operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>  .groupBy('a) // leave out groupBy-clause to define global aggregates
>  .agg(fun: AggregateFunction)  // output has columns 'a, 'b, 'c
>  .select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718449#comment-16718449
 ] 

ASF GitHub Bot commented on FLINK-10976:


hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877569
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -999,6 +999,47 @@ class Table(
 new OverWindowedTable(this, overWindows.toArray)
   }
 
+  /**
+* Performs a global aggregate operation with an aggregate function. Use 
this before a selection
+* to perform the selection operation. The output will be flattened if the 
output type is a
+* composite type.
+*
+* Example:
+*
+* {{{
+*   val aggFunc: AggregateFunction[_, _] = new MyAggregateFunction
+*   tableEnv.registerFunction("aggFunc", aggFunc);
+*   table.aggregate("aggFunc(a, b) as (f0, f1, f2)").select('f0, 'f1)
+* }}}
+*/
+  def aggregate(aggregateFunction: Expression): AggregatedTable = {
+unwrap(aggregateFunction, tableEnv) match {
 
 Review comment:
   Line 1016:1021 is duplicated with line 1139:1144. How about change Line 
1016:1021 to:
   `groupBy().aggregate(aggregateFunction)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Aggregate operator to Table API
> ---
>
> Key: FLINK-10976
> URL: https://issues.apache.org/jira/browse/FLINK-10976
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add Aggregate operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>  .groupBy('a) // leave out groupBy-clause to define global aggregates
>  .agg(fun: AggregateFunction)  // output has columns 'a, 'b, 'c
>  .select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API

2018-12-11 Thread GitBox
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877547
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -1682,6 +1682,36 @@ The `OverWindow` defines a range of rows over which 
aggregates are computed. `Ov
 
 {% top %}
 
+### Aggregate
 
 Review comment:
   Users may be confused to see `Aggregations` and `Aggregate`. 
   Should we divide the current `Operations` chapter into two sub ones?
   - SQL base Operations
   - Non SQL base Operations
   
   And we can put all map/flatmap/aggregate/flataggregate into the `Non SQL 
base Operations` chapter. It is more clear.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718451#comment-16718451
 ] 

ASF GitHub Bot commented on FLINK-10976:


hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877558
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -1682,6 +1682,36 @@ The `OverWindow` defines a range of rows over which 
aggregates are computed. `Ov
 
 {% top %}
 
+### Aggregate
+
+Aggregate performs an aggregate operation with an aggregate function. You have 
to close the
+"aggregate" with a select statement. The output will be flattened if the 
output type is a
+composite type.
+
+
+
+{% highlight java %}
+tableEnv.registerFunction("myAggFunc", new MyAggregateFunction());
 
 Review comment:
   How about info user that `MyAggregateFunction` is an `AggregateFunction` by 
define a variable: `AggregateFunction myAggFunc = new 
MyAggregateFunction()`. Same as the scala example. We will have Table aggregate 
functions later. I think it would be nice if we distinguish them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Aggregate operator to Table API
> ---
>
> Key: FLINK-10976
> URL: https://issues.apache.org/jira/browse/FLINK-10976
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add Aggregate operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>  .groupBy('a) // leave out groupBy-clause to define global aggregates
>  .agg(fun: AggregateFunction)  // output has columns 'a, 'b, 'c
>  .select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718448#comment-16718448
 ] 

ASF GitHub Bot commented on FLINK-10976:


hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877602
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -1226,3 +1308,76 @@ class WindowGroupedTable(
 select(withResolvedAggFunctionCall: _*)
   }
 }
+
+class AggregatedTable(
+private[flink] val table: Table,
+private[flink] val groupKeys: Seq[Expression],
+private[flink] val aggregateFunction: Expression) {
+
+  /**
+* Performs a selection operation after an aggregate operation. The field 
expressions
+* cannot contain table functions and aggregations.
+*
+* Example:
+*
+* {{{
+*   val aggFunc: AggregateFunction[_, _] = new MyAggregateFunction
+*   tableEnv.registerFunction("aggFunc", aggFunc);
+*   table.groupBy('key).aggregate("aggFunc(a, b) as (f0, f1, 
f2)").select('key, 'f0, 'f1)
+* }}}
+*/
+  def select(fields: Expression*): Table = {
+val tableEnv = table.tableEnv
+val (aggNames, propNames) = 
extractAggregationsAndProperties(Seq(aggregateFunction), tableEnv)
+val projectsOnAgg = replaceAggregationsAndProperties(
+  groupKeys ++ Seq(aggregateFunction), tableEnv, aggNames, propNames)
+val projectFields = extractFieldReferences(groupKeys ++ 
Seq(aggregateFunction))
+
+val aggTable = new Table(tableEnv,
+  Project(projectsOnAgg,
+Aggregate(groupKeys, aggNames.map(a => Alias(a._1, a._2)).toSeq,
+  Project(projectFields, table.logicalPlan).validate(tableEnv)
+).validate(tableEnv)
+  ).validate(tableEnv))
+
+// expand the aggregate results
+val projectsOnAggTable =
+  aggTable.logicalPlan.output.take(groupKeys.length) ++
+  expandProjectList(
+Seq(Flattening(aggTable.logicalPlan.output.last)),
+aggTable.logicalPlan,
+tableEnv).zip(extractFieldNames(aggregateFunction)).map(a => 
Alias(a._1, a._2))
+
+val expandedFields = expandProjectList(fields, aggTable.logicalPlan, 
tableEnv)
 
 Review comment:
   This `expandedFields` is used by the last `Project` bellow, so we can't 
expand fields from the agg Table. Instead, we should expand fields from the 
flattened Table after aggregate. 
   The error can be reproduced by `select("*")` after aggregate.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Aggregate operator to Table API
> ---
>
> Key: FLINK-10976
> URL: https://issues.apache.org/jira/browse/FLINK-10976
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add Aggregate operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>  .groupBy('a) // leave out groupBy-clause to define global aggregates
>  .agg(fun: AggregateFunction)  // output has columns 'a, 'b, 'c
>  .select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718445#comment-16718445
 ] 

ASF GitHub Bot commented on FLINK-10976:


hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877547
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -1682,6 +1682,36 @@ The `OverWindow` defines a range of rows over which 
aggregates are computed. `Ov
 
 {% top %}
 
+### Aggregate
 
 Review comment:
   Users may be confused to see `Aggregations` and `Aggregate`. 
   Should we divide the current `Operations` chapter into tow sub ones?
   - SQL base Operations
   - Non SQL base Operations
   
   And we can put all map/flatmap/aggregate/flataggregate into the `Non SQL 
base Operations` chapter. It is more clear.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Aggregate operator to Table API
> ---
>
> Key: FLINK-10976
> URL: https://issues.apache.org/jira/browse/FLINK-10976
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add Aggregate operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>  .groupBy('a) // leave out groupBy-clause to define global aggregates
>  .agg(fun: AggregateFunction)  // output has columns 'a, 'b, 'c
>  .select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718454#comment-16718454
 ] 

ASF GitHub Bot commented on FLINK-10976:


hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877547
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -1682,6 +1682,36 @@ The `OverWindow` defines a range of rows over which 
aggregates are computed. `Ov
 
 {% top %}
 
+### Aggregate
 
 Review comment:
   Users may be confused to see `Aggregations` and `Aggregate`. 
   Should we divide the current `Operations` chapter into two sub ones?
   - SQL base Operations
   - Non SQL base Operations
   
   And we can put all map/flatmap/aggregate/flataggregate into the `Non SQL 
base Operations` chapter. It is more clear.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Aggregate operator to Table API
> ---
>
> Key: FLINK-10976
> URL: https://issues.apache.org/jira/browse/FLINK-10976
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add Aggregate operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>  .groupBy('a) // leave out groupBy-clause to define global aggregates
>  .agg(fun: AggregateFunction)  // output has columns 'a, 'b, 'c
>  .select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API

2018-12-11 Thread GitBox
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877583
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -999,6 +999,47 @@ class Table(
 new OverWindowedTable(this, overWindows.toArray)
   }
 
+  /**
+* Performs a global aggregate operation with an aggregate function. Use 
this before a selection
+* to perform the selection operation. The output will be flattened if the 
output type is a
+* composite type.
+*
+* Example:
+*
+* {{{
+*   val aggFunc: AggregateFunction[_, _] = new MyAggregateFunction
+*   tableEnv.registerFunction("aggFunc", aggFunc);
+*   table.aggregate("aggFunc(a, b) as (f0, f1, f2)").select('f0, 'f1)
+* }}}
+*/
+  def aggregate(aggregateFunction: Expression): AggregatedTable = {
+unwrap(aggregateFunction, tableEnv) match {
+  case _: Aggregation =>
+  case _ => throw new ValidationException("Only AggregateFunction can be 
used in aggregate.")
+}
+
+new AggregatedTable(this, Nil, aggregateFunction)
+  }
+
+  /**
+* Performs a global aggregate operation with an aggregate function. Use 
this before a selection
+* to perform the selection operation. The output will be flattened if the 
output type is a
+* composite type.
+*
+* Example:
+*
+* {{{
+*   val aggFunc: AggregateFunction[_, _] = new MyAggregateFunction
+*   tableEnv.registerFunction("aggFunc", aggFunc);
+*   table.aggregate("aggFunc(a, b) as (f0, f1, f2)").select("f0, f1")
+* }}}
+*/
+  def aggregate(aggregateFunction: String): AggregatedTable = {
+val withResolvedAggFunctionCall = replaceAggFunctionCall(
 
 Review comment:
   Replace with `groupBy().aggregate(aggregateFunction)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API

2018-12-11 Thread GitBox
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877602
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -1226,3 +1308,76 @@ class WindowGroupedTable(
 select(withResolvedAggFunctionCall: _*)
   }
 }
+
+class AggregatedTable(
+private[flink] val table: Table,
+private[flink] val groupKeys: Seq[Expression],
+private[flink] val aggregateFunction: Expression) {
+
+  /**
+* Performs a selection operation after an aggregate operation. The field 
expressions
+* cannot contain table functions and aggregations.
+*
+* Example:
+*
+* {{{
+*   val aggFunc: AggregateFunction[_, _] = new MyAggregateFunction
+*   tableEnv.registerFunction("aggFunc", aggFunc);
+*   table.groupBy('key).aggregate("aggFunc(a, b) as (f0, f1, 
f2)").select('key, 'f0, 'f1)
+* }}}
+*/
+  def select(fields: Expression*): Table = {
+val tableEnv = table.tableEnv
+val (aggNames, propNames) = 
extractAggregationsAndProperties(Seq(aggregateFunction), tableEnv)
+val projectsOnAgg = replaceAggregationsAndProperties(
+  groupKeys ++ Seq(aggregateFunction), tableEnv, aggNames, propNames)
+val projectFields = extractFieldReferences(groupKeys ++ 
Seq(aggregateFunction))
+
+val aggTable = new Table(tableEnv,
+  Project(projectsOnAgg,
+Aggregate(groupKeys, aggNames.map(a => Alias(a._1, a._2)).toSeq,
+  Project(projectFields, table.logicalPlan).validate(tableEnv)
+).validate(tableEnv)
+  ).validate(tableEnv))
+
+// expand the aggregate results
+val projectsOnAggTable =
+  aggTable.logicalPlan.output.take(groupKeys.length) ++
+  expandProjectList(
+Seq(Flattening(aggTable.logicalPlan.output.last)),
+aggTable.logicalPlan,
+tableEnv).zip(extractFieldNames(aggregateFunction)).map(a => 
Alias(a._1, a._2))
+
+val expandedFields = expandProjectList(fields, aggTable.logicalPlan, 
tableEnv)
 
 Review comment:
   This `expandedFields` is used by the last `Project` bellow, so we can't 
expand fields from the agg Table. Instead, we should expand fields from the 
flattened Table after aggregate. 
   The error can be reproduced by `select("*")` after aggregate.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API

2018-12-11 Thread GitBox
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877626
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/AggregateValidationTest.scala
 ##
 @@ -208,4 +208,58 @@ class AggregateValidationTest extends TableTestBase {
 // must fail. UDAGG does not accept String type
 .select("myWeightedAvg(c, a)")
   }
+
 
 Review comment:
   Add a test to validate aggregate with two aggregate functions?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API

2018-12-11 Thread GitBox
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877612
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ##
 @@ -418,4 +419,34 @@ object ProjectionTranslator {
   case e: Expression => e
 }
   }
+
+  /**
+* Unwrap a Call to ScalarFunctionCall, TableFunctionCall, AggFunctionCall 
or the built-in
+* expression representation.
+*
+* @param fieldthe expression to unwrap
+* @param tableEnv the TableEnvironment
+* @return the unwrapped expression
+*/
+  def unwrap(
+  field: Expression,
+  tableEnv: TableEnvironment): Expression = {
+field match {
+  case Alias(child, _, _) =>
+unwrap(child, tableEnv)
+  // Functions calls
+  case c @ Call(name, args: Seq[Expression]) =>
+val function = tableEnv.functionCatalog.lookupFunction(name, args)
+unwrap(function, tableEnv)
+  // Other expressions
+  case e: Expression => e
+}
+  }
+
+  def extractFieldNames(expr: Expression): Seq[String] = {
+expr match {
+  case Alias(child, name, extraNames) => Seq(name) ++ extraNames
 
 Review comment:
   replace child with _


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API

2018-12-11 Thread GitBox
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877554
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -1682,6 +1682,36 @@ The `OverWindow` defines a range of rows over which 
aggregates are computed. `Ov
 
 {% top %}
 
+### Aggregate
+
+Aggregate performs an aggregate operation with an aggregate function. You have 
to close the
+"aggregate" with a select statement. The output will be flattened if the 
output type is a
 
 Review comment:
   Add a sentence that can't use an aggregate function in the select statement 
as it has been checked in the select. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API

2018-12-11 Thread GitBox
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877558
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -1682,6 +1682,36 @@ The `OverWindow` defines a range of rows over which 
aggregates are computed. `Ov
 
 {% top %}
 
+### Aggregate
+
+Aggregate performs an aggregate operation with an aggregate function. You have 
to close the
+"aggregate" with a select statement. The output will be flattened if the 
output type is a
+composite type.
+
+
+
+{% highlight java %}
+tableEnv.registerFunction("myAggFunc", new MyAggregateFunction());
 
 Review comment:
   How about info user that `MyAggregateFunction` is an `AggregateFunction` by 
define a variable: `AggregateFunction myAggFunc = new 
MyAggregateFunction()`. Same as the scala example. We will have Table aggregate 
functions later. I think it would be nice if we distinguish them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API

2018-12-11 Thread GitBox
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877547
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -1682,6 +1682,36 @@ The `OverWindow` defines a range of rows over which 
aggregates are computed. `Ov
 
 {% top %}
 
+### Aggregate
 
 Review comment:
   Users may be confused to see `Aggregations` and `Aggregate`. 
   Should we divide the current `Operations` chapter into tow sub ones?
   - SQL base Operations
   - Non SQL base Operations
   
   And we can put all map/flatmap/aggregate/flataggregate into the `Non SQL 
base Operations` chapter. It is more clear.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API

2018-12-11 Thread GitBox
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877569
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -999,6 +999,47 @@ class Table(
 new OverWindowedTable(this, overWindows.toArray)
   }
 
+  /**
+* Performs a global aggregate operation with an aggregate function. Use 
this before a selection
+* to perform the selection operation. The output will be flattened if the 
output type is a
+* composite type.
+*
+* Example:
+*
+* {{{
+*   val aggFunc: AggregateFunction[_, _] = new MyAggregateFunction
+*   tableEnv.registerFunction("aggFunc", aggFunc);
+*   table.aggregate("aggFunc(a, b) as (f0, f1, f2)").select('f0, 'f1)
+* }}}
+*/
+  def aggregate(aggregateFunction: Expression): AggregatedTable = {
+unwrap(aggregateFunction, tableEnv) match {
 
 Review comment:
   Line 1016:1021 is duplicated with line 1139:1144. How about change Line 
1016:1021 to:
   `groupBy().aggregate(aggregateFunction)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API

2018-12-11 Thread GitBox
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] 
Add support for aggregate to table API
URL: https://github.com/apache/flink/pull/7235#discussion_r240877640
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
 ##
 @@ -128,4 +128,71 @@ class AggregateStringExpressionTest extends TableTestBase 
{
 
 verifyTableEquals(resJava, resScala)
   }
+
+  @Test
+  def testNonGroupedTableAggregate(): Unit = {
+val util = streamTestUtil()
+val t = util.addTable[(Int, Long, String)]('a, 'b, 'c)
+
+val testAgg = new CountMinMax
+util.tableEnv.registerFunction("testAgg", testAgg)
+
+// Expression / Scala API
+val resScala = t
+  .aggregate(testAgg('a))
+  .select('f0, 'f1)
+
+// String / Java API
+val resJava = t
+  .aggregate("testAgg(a)")
+  .select("f0, f1")
+
+verifyTableEquals(resScala, resJava)
+  }
+
+  @Test
+  def testTableAggregate(): Unit = {
 
 Review comment:
   How about renaming it to `testAggregate`. To distinguish from the todo 
`flatAggregate`, `TableAggregateFunction`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] samsai edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-11 Thread GitBox
samsai edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL 
timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-446445016
 
 
   @walterddr could you please check the timezone problem when dealing with 
eventtime?  
   it seems that lamber-ken's commit only fixed proc time.  
   Would you please refer to my previous comment to take a look at my eventtime 
usecase. thanks a lot!
   --update
   i found the correct timestamp was changed in OutputRowtimeProcessFunction, 
using SqlFunctions.internalToTimestamp(), i'm not sure whether its redundant or 
not.  
   please fix this problem. thx
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718434#comment-16718434
 ] 

ASF GitHub Bot commented on FLINK-11010:


samsai edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL 
timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-446445016
 
 
   @walterddr could you please check the timezone problem when dealing with 
eventtime?  
   it seems that lamber-ken's commit only fixed proc time.  
   Would you please refer to my previous comment to take a look at my eventtime 
usecase. thanks a lot!
   --update
   i found the correct timestamp was changed in OutputRowtimeProcessFunction, 
using SqlFunctions.internalToTimestamp(), i'm not sure whether its redundant or 
not.  
   please fix this problem. thx
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0, 1.8.0, 1.7.1
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11136) Fix the logical of merge for DISTINCT aggregates

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718430#comment-16718430
 ] 

ASF GitHub Bot commented on FLINK-11136:


dianfu opened a new pull request #7284: [FLINK-11136] [table] Fix the merge 
logic of DISTINCT aggregates
URL: https://github.com/apache/flink/pull/7284
 
 
   ## What is the purpose of the change
   
   *This pull request fix the merge logic of DISTINCT aggregates.*
   
   ## Brief change log
   
 - *Fix the codegen logic in AggregationCodeGenerator to extract the 
distinct fields for merge*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Updated integration tests 
SqlITCase#testDistinctAggWithMergeOnEventTimeSessionGroupWindow*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix the logical of merge for DISTINCT aggregates
> 
>
> Key: FLINK-11136
> URL: https://issues.apache.org/jira/browse/FLINK-11136
> Project: Flink
>  Issue Type: Test
>  Components: Table API  SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> The logic of merge for DISTINCT aggregates has bug. For the following query:
> {code:java}
> SELECT
>   c,
>   COUNT(DISTINCT b),
>   SUM(DISTINCT b),
>   SESSION_END(rowtime, INTERVAL '0.005' SECOND)
> FROM MyTable
> GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c{code}
> the following exception will be thrown:
> {code:java}
> Caused by: java.lang.ClassCastException: org.apache.flink.types.Row cannot be 
> cast to java.lang.Integer
> at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
> at scala.math.Numeric$IntIsIntegral$.plus(Numeric.scala:58)
> at 
> org.apache.flink.table.functions.aggfunctions.SumAggFunction.accumulate(SumAggFunction.scala:50)
> at GroupingWindowAggregateHelper$18.mergeAccumulatorsPair(Unknown Source)
> at 
> org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:66)
> at 
> org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:33)
> at 
> org.apache.flink.runtime.state.heap.HeapAggregatingState.mergeState(HeapAggregatingState.java:117)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapMergingState$MergeTransformation.apply(AbstractHeapMergingState.java:102)
> at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:463)
> at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapMergingState.mergeNamespaces(AbstractHeapMergingState.java:91)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:341)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11136) Fix the logical of merge for DISTINCT aggregates

2018-12-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11136:
---
Labels: pull-request-available  (was: )

> Fix the logical of merge for DISTINCT aggregates
> 
>
> Key: FLINK-11136
> URL: https://issues.apache.org/jira/browse/FLINK-11136
> Project: Flink
>  Issue Type: Test
>  Components: Table API  SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> The logic of merge for DISTINCT aggregates has bug. For the following query:
> {code:java}
> SELECT
>   c,
>   COUNT(DISTINCT b),
>   SUM(DISTINCT b),
>   SESSION_END(rowtime, INTERVAL '0.005' SECOND)
> FROM MyTable
> GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c{code}
> the following exception will be thrown:
> {code:java}
> Caused by: java.lang.ClassCastException: org.apache.flink.types.Row cannot be 
> cast to java.lang.Integer
> at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
> at scala.math.Numeric$IntIsIntegral$.plus(Numeric.scala:58)
> at 
> org.apache.flink.table.functions.aggfunctions.SumAggFunction.accumulate(SumAggFunction.scala:50)
> at GroupingWindowAggregateHelper$18.mergeAccumulatorsPair(Unknown Source)
> at 
> org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:66)
> at 
> org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:33)
> at 
> org.apache.flink.runtime.state.heap.HeapAggregatingState.mergeState(HeapAggregatingState.java:117)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapMergingState$MergeTransformation.apply(AbstractHeapMergingState.java:102)
> at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:463)
> at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
> at 
> org.apache.flink.runtime.state.heap.AbstractHeapMergingState.mergeNamespaces(AbstractHeapMergingState.java:91)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:341)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11037) Introduce another greedy mechanism for distributing floating buffers

2018-12-11 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718422#comment-16718422
 ] 

zhijiang edited comment on FLINK-11037 at 12/12/18 3:38 AM:


[~StephanEwen], thanks for giving above proposals in point. :)
 * Network stack is very sensitive for performance. We should consider overall 
fairness as you mentioned for avoiding backpressure in some extent.

 * I think your proposal is a proper way to forward this feature, just like we 
give a flag to keep both credit and non-credit modes currently.

 * The adaptive algorithm sounds ideal but may behave unexpected in reality. So 
I agree with your point of forwarding simple determined way first, then we can 
forward step by step based on further experiments.

Actually I implemented the extreme greedy algorithm as the first version in 
Alibaba for credit-based, and it experienced the challenges in double 11 and 
run for half an year in production. During contributing the whole feature to 
community, I changed to fair algorithm and refactored our private branch to 
keep same with community.

In fact we have not compared the specific performances for these two algorithms 
in different scenarios, then it is hard to say which one is better. But I can 
think of two scenarios to verify greedy algorithm better in theory. If the 
floating buffers in enough for all the input channels, that means no matter 
with fair or greedy algorithm, all the channels can always get all the required 
floating buffers from pool as a result. Another data skew case is that only one 
input channel receives amount of data and other input channels almost empty.

In these two cases the performance should be better in greedy way because we 
only need one request from pool to fetch all the floating buffers instead of 
looping many times to fetch one buffer each time, which would enter 
synchronized lock on pool side. 


was (Author: zjwang):
[~StephanEwen], thanks for giving above proposals in point. :)
 * Network stack is very sensitive for performance. We should consider overall 
fairness as you mentioned for avoiding backpressure in some extent.

 * I think your proposal is a proper way to forward this feature, just like we 
give a flag to keep both credit and non-credit modes currently.

 * The adaptive algorithm sounds ideal but may behave unexpected in reality. So 
I agree with your point of forwarding simple determined way first, then we can 
forward step by step based on further experiments. Actually I implemented the 
extreme greedy algorithm as the first version in Alibaba for credit-based, and 
it experienced the challenges in double 11 and run for half an year in 
production. During contributing the whole feature to community, I changed to 
fair algorithm and refactored our private branch to keep same with community. 
In fact we have not compared the specific performances for these two algorithms 
in different scenarios, then it is hard to say which one is better. But I can 
think of one scenario to verify greedy algorithm better in theory. If the 
floating buffers in enough for all the input channels, that means no matter 
with fair or greedy algorithm, all the channels can always get all the required 
floating buffers from pool as a result. In this case the performance should be 
better in greedy way because we only need one request from pool to fetch all 
the floating buffers instead of looping many times to fetch one buffer each 
time, which would enter synchronized lock on pool side among different channels.

> Introduce another greedy mechanism for distributing floating buffers
> 
>
> Key: FLINK-11037
> URL: https://issues.apache.org/jira/browse/FLINK-11037
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> The current mechanism for distributing floating buffers is fair for all the 
> listeners. In detail, each input channel can only request one floating buffer 
> each time although this channel may actually need more floating buffers. Then 
> this channel has to loop to request floating buffer until all are satisfied 
> or pool is exhausted.
> In generally speaking, this way seems fair for all the concurrent channels 
> invoked by netty nio thread.  But every request from LocalBufferPool needs to 
> syn lock and it is hard to say how to distribute all the available floating 
> buffers behaves better in real scenarios.
> Therefore we propose another greedy mechanism to request more floating 
> buffers each time. In extreme case, we can even request all the required 
> buffers at a time or partial ones via configured parameters.  On the other 
> side, LocalBufferPool can also 

[GitHub] dianfu opened a new pull request #7284: [FLINK-11136] [table] Fix the merge logic of DISTINCT aggregates

2018-12-11 Thread GitBox
dianfu opened a new pull request #7284: [FLINK-11136] [table] Fix the merge 
logic of DISTINCT aggregates
URL: https://github.com/apache/flink/pull/7284
 
 
   ## What is the purpose of the change
   
   *This pull request fix the merge logic of DISTINCT aggregates.*
   
   ## Brief change log
   
 - *Fix the codegen logic in AggregationCodeGenerator to extract the 
distinct fields for merge*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Updated integration tests 
SqlITCase#testDistinctAggWithMergeOnEventTimeSessionGroupWindow*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11136) Fix the logical of merge for DISTINCT aggregates

2018-12-11 Thread Dian Fu (JIRA)
Dian Fu created FLINK-11136:
---

 Summary: Fix the logical of merge for DISTINCT aggregates
 Key: FLINK-11136
 URL: https://issues.apache.org/jira/browse/FLINK-11136
 Project: Flink
  Issue Type: Test
  Components: Table API  SQL
Reporter: Dian Fu
Assignee: Dian Fu


The logic of merge for DISTINCT aggregates has bug. For the following query:
{code:java}
SELECT
  c,
  COUNT(DISTINCT b),
  SUM(DISTINCT b),
  SESSION_END(rowtime, INTERVAL '0.005' SECOND)
FROM MyTable
GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c{code}
the following exception will be thrown:
{code:java}
Caused by: java.lang.ClassCastException: org.apache.flink.types.Row cannot be 
cast to java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
at scala.math.Numeric$IntIsIntegral$.plus(Numeric.scala:58)
at 
org.apache.flink.table.functions.aggfunctions.SumAggFunction.accumulate(SumAggFunction.scala:50)
at GroupingWindowAggregateHelper$18.mergeAccumulatorsPair(Unknown Source)
at 
org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:66)
at 
org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:33)
at 
org.apache.flink.runtime.state.heap.HeapAggregatingState.mergeState(HeapAggregatingState.java:117)
at 
org.apache.flink.runtime.state.heap.AbstractHeapMergingState$MergeTransformation.apply(AbstractHeapMergingState.java:102)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:463)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
at 
org.apache.flink.runtime.state.heap.AbstractHeapMergingState.mergeNamespaces(AbstractHeapMergingState.java:91)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:341)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
at 
org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10817) Upgrade presto dependency to support path-style access

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718425#comment-16718425
 ] 

ASF GitHub Bot commented on FLINK-10817:


adamlamar commented on issue #7058: [FLINK-10817] Upgrade presto dependency to 
support path-style access
URL: https://github.com/apache/flink/pull/7058#issuecomment-446449939
 
 
   @zentol @aljoscha With FLINK-10987 merged, is this ready for another review?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Upgrade presto dependency to support path-style access
> --
>
> Key: FLINK-10817
> URL: https://issues.apache.org/jira/browse/FLINK-10817
> Project: Flink
>  Issue Type: Improvement
>Reporter: Adam Lamar
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
>
> In order to use any given non-AWS s3 implementation backed by the presto s3 
> filesystem, it is necessary to set at least one configuration parameter in 
> flink-conf.yaml:
>  * presto.s3.endpoint: https://example.com
> This appears to work as expected for hosted s3 alternatives.
> In order to use a bring-your-own, self-hosted s3 alternative like 
> [minio|https://www.minio.io/], at least two configuration parameters are 
> required:
>  * presto.s3.endpoint: https://example.com
>  * presto.s3.path-style-access: true
> However, the second path-style-access parameter doesn't work because the 
> 0.185 version of presto doesn't support passing through that configuration 
> option to the hive s3 client.
> To work around the issue, path-style-access can be forced on the s3 client by 
> using an IP address for the endpoint (instead of a hostname). Without this 
> workaround, flink attempts to use the virtualhost-style at 
> bucketname.example.com, which fails unless the expected DNS records exist.
> To solve this problem and enable non-IP endpoints, upgrade the 
> [pom|https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36]
>  to at least 0.186 which includes [this 
> commit|https://github.com/prestodb/presto/commit/0707f2f21a96d2fd30953fb3fa9a9a03f03d88bd]
>  Note that 0.213 is the latest presto release.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] adamlamar commented on issue #7058: [FLINK-10817] Upgrade presto dependency to support path-style access

2018-12-11 Thread GitBox
adamlamar commented on issue #7058: [FLINK-10817] Upgrade presto dependency to 
support path-style access
URL: https://github.com/apache/flink/pull/7058#issuecomment-446449939
 
 
   @zentol @aljoscha With FLINK-10987 merged, is this ready for another review?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11037) Introduce another greedy mechanism for distributing floating buffers

2018-12-11 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718422#comment-16718422
 ] 

zhijiang commented on FLINK-11037:
--

[~StephanEwen], thanks for giving above proposals in point. :)
 * Network stack is very sensitive for performance. We should consider overall 
fairness as you mentioned for avoiding backpressure in some extent.

 * I think your proposal is a proper way to forward this feature, just like we 
give a flag to keep both credit and non-credit modes currently.

 * The adaptive algorithm sounds ideal but may behave unexpected in reality. So 
I agree with your point of forwarding simple determined way first, then we can 
forward step by step based on further experiments. Actually I implemented the 
extreme greedy algorithm as the first version in Alibaba for credit-based, and 
it experienced the challenges in double 11 and run for half an year in 
production. During contributing the whole feature to community, I changed to 
fair algorithm and refactored our private branch to keep same with community. 
In fact we have not compared the specific performances for these two algorithms 
in different scenarios, then it is hard to say which one is better. But I can 
think of one scenario to verify greedy algorithm better in theory. If the 
floating buffers in enough for all the input channels, that means no matter 
with fair or greedy algorithm, all the channels can always get all the required 
floating buffers from pool as a result. In this case the performance should be 
better in greedy way because we only need one request from pool to fetch all 
the floating buffers instead of looping many times to fetch one buffer each 
time, which would enter synchronized lock on pool side among different channels.

> Introduce another greedy mechanism for distributing floating buffers
> 
>
> Key: FLINK-11037
> URL: https://issues.apache.org/jira/browse/FLINK-11037
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> The current mechanism for distributing floating buffers is fair for all the 
> listeners. In detail, each input channel can only request one floating buffer 
> each time although this channel may actually need more floating buffers. Then 
> this channel has to loop to request floating buffer until all are satisfied 
> or pool is exhausted.
> In generally speaking, this way seems fair for all the concurrent channels 
> invoked by netty nio thread.  But every request from LocalBufferPool needs to 
> syn lock and it is hard to say how to distribute all the available floating 
> buffers behaves better in real scenarios.
> Therefore we propose another greedy mechanism to request more floating 
> buffers each time. In extreme case, we can even request all the required 
> buffers at a time or partial ones via configured parameters.  On the other 
> side, LocalBufferPool can also decide how many floating buffers should been 
> assigned based on some factors, such as how many total channels and how many 
> total floating buffers.
> The motivation is making better use of floating buffer resources and it may 
> need extra metrics for adjusting the mechanism dynamically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11135) Reduce priority of the deprecated Hadoop specific Flink conf options

2018-12-11 Thread Paul Lin (JIRA)
Paul Lin created FLINK-11135:


 Summary: Reduce priority of the deprecated Hadoop specific Flink 
conf options
 Key: FLINK-11135
 URL: https://issues.apache.org/jira/browse/FLINK-11135
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.7.0, 1.6.2
Reporter: Paul Lin
Assignee: Paul Lin


In [FLINK-7967] we mark the Hadoop specific Flink configuration options as 
deprecated and recommend users to use HADOOP_CONF_DIR env variable instead, but 
if both are configured, Flink would still look for Hadoop configurations in the 
path specified in Flink configurations first. Plus, the code that searches 
Hadoop configurations in `fs.hdfs.hadoopconf` is wrongly placed in the env 
variables section. 

I think the search order should be: HADOOP_CONF_DIR > Flink conf options > 
HADOOP_HOME.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] KarmaGYZ opened a new pull request #7283: [hotfix][docs] Fix the mismatch generic type in hadoop compatibility doc

2018-12-11 Thread GitBox
KarmaGYZ opened a new pull request #7283: [hotfix][docs] Fix the mismatch 
generic type in hadoop compatibility doc
URL: https://github.com/apache/flink/pull/7283
 
 
   ## What is the purpose of the change
   
   This pr fix the mismatch generic type in hadoop compatibility doc.
   
   ## Brief change log
   
   - Fix the mismatch generic type in section **Using Hadoop Mappers and 
Reducers** according to the context
   - Fix the mismatch generic type in section **Complete Hadoop WordCount 
Example**
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718417#comment-16718417
 ] 

ASF GitHub Bot commented on FLINK-11010:


samsai commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is 
inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-446445016
 
 
   @walterddr could you please check the timezone problem when dealing with 
eventtime?  
   it seems that lamber-ken's commit only fixed proc time.  
   Would you please refer to my previous comment to take a look at my eventtime 
usecase. thanks a lot!
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0, 1.8.0, 1.7.1
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] samsai commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-11 Thread GitBox
samsai commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is 
inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-446445016
 
 
   @walterddr could you please check the timezone problem when dealing with 
eventtime?  
   it seems that lamber-ken's commit only fixed proc time.  
   Would you please refer to my previous comment to take a look at my eventtime 
usecase. thanks a lot!
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11134) Invalid REST API request should not log the full exception in Flink logs

2018-12-11 Thread Mark Cho (JIRA)
Mark Cho created FLINK-11134:


 Summary: Invalid REST API request should not log the full 
exception in Flink logs
 Key: FLINK-11134
 URL: https://issues.apache.org/jira/browse/FLINK-11134
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.7.0
Reporter: Mark Cho


{code:java}
2018-12-11 17:52:19,207 ERROR 
org.apache.flink.runtime.rest.handler.job.JobDetailsHandler - Exception 
occurred in REST handler.
org.apache.flink.runtime.rest.NotFoundException: Job 
15d06690e88d309aa1bdbb6ce7c6dcd1 not found
at 
org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$1(AbstractExecutionGraphHandler.java:90)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache.lambda$getExecutionGraph$0(ExecutionGraphCache.java:133)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could 
not find Flink job (15d06690e88d309aa1bdbb6ce7c6dcd1)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGatewayFuture(Dispatcher.java:766)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.requestJob(Dispatcher.java:485)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at 

[GitHub] KarmaGYZ opened a new pull request #7282: [hotfix][docs] imporve docs of batch overview

2018-12-11 Thread GitBox
KarmaGYZ opened a new pull request #7282: [hotfix][docs] imporve docs of batch 
overview
URL: https://github.com/apache/flink/pull/7282
 
 
   ## What is the purpose of the change
   
   This pr is to improve the docs of Batch Overview, make the sample code of 
partitionCustom more readable.
   
   ## Brief change log
   
   - Improve the sample code of custom partitioning in java and scala
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11067) Port TableEnvironments to Java

2018-12-11 Thread Dian Fu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718401#comment-16718401
 ] 

Dian Fu commented on FLINK-11067:
-

Hi [~twalthr], Thanks a lot for the solution. Agree with you that we should 
expose interfaces instead of implementations.
I have some quick questions and looking forward to your reply:
1. Regarding to the return type BatchTableEnvironment of the following API:
{code:java}
static BatchTableEnvironment getTableEnvironment(BatchEnvironment env);{code}
What's the package name of BatchTableEnvironment? Seems that the backwards 
compatibility will always be broken no matter what it is.

2. The facade class for Batch/StreamTableEnvironment needs to consider for both 
Scala and Java or we will have separate facade classes for Scala and Java?

 

 

> Port TableEnvironments to Java
> --
>
> Key: FLINK-11067
> URL: https://issues.apache.org/jira/browse/FLINK-11067
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> This task includes porting {{TableEnvironment}}, {{StreamTableEnvironment}}, 
> {{BatchTableEnvironment}} to Java. API-breaking changes need to be avoided 
> and discussed. Some refactoring and clean up might be necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11133) FsCheckpointStorage is unaware about S3 entropy when creating directories

2018-12-11 Thread Mark Cho (JIRA)
Mark Cho created FLINK-11133:


 Summary: FsCheckpointStorage is unaware about S3 entropy when 
creating directories
 Key: FLINK-11133
 URL: https://issues.apache.org/jira/browse/FLINK-11133
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.7.0
Reporter: Mark Cho


We currently use S3 for our checkpoint storage with S3 entropy enabled.

Entropy seems to be working correctly when writing out checkpoint metadata file 
(entropy key is correctly stripped from `state.checkpoints.dir`) and when 
writing out checkpoint data file (entropy key is correctly replaced with random 
string).

However, from the logs, it seems like entropy key is not stripped or replaced 
when `FsCheckpointStorage` creates directories in the following class:

[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java#L83-L85]

Should FsCheckpointStorage skip initializing mkdir calls if object store like 
S3 is used since S3 doesn't have directory concept?

If we want to keep the `mkdir` calls in `FsCheckpointStorage`, we should handle 
the entropy key specified in `state.checkpoints.dir`. Currently, folder markers 
in S3 are being created by Hadoop FileSystem with the entropy key in the path 
as a result of `mkdir` calls in `FsCheckpointStorage`).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718365#comment-16718365
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-446434187
 
 
   @azagrebin Thank you for your explanation. 
   As your explanation, I will 
- first move `DirectExecutorService` into 
`org.apache.flink.runtime.concurrent` and change the `Executor 
Executors#directExecutor()` to `DirectExecutorService 
Executors#direcExecutorService()`, this will be done in a separate commit.
   - then use `Executors#direcExecutorSerivice()` in current patch to share the 
logic of `threadNum = 1` and `threadNum > 1`.
   
   Did I understand right? If yes, I will file a issue and implement the first 
move, and then come back to complete this patch.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11081) Support binding port range for REST server

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718372#comment-16718372
 ] 

ASF GitHub Bot commented on FLINK-11081:


yanghua commented on issue #7263: [FLINK-11081] Support binding port range for 
REST server
URL: https://github.com/apache/flink/pull/7263#issuecomment-446434964
 
 
   hi @walterddr Thank you for your review. I responded to them and I think 
some suggestions make sense. In addition, rest.port will not be deprecated, 
after which it will only be used by the rest client, which makes its 
responsibilities clearer (it was originally used by both the server and the 
client), see the description of this issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support binding port range for REST server
> --
>
> Key: FLINK-11081
> URL: https://issues.apache.org/jira/browse/FLINK-11081
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{RestServerEndpoint}} binds to the port specified by 
> {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be 
> useful to being able to specify not only a single port but a port range to 
> pick a port from. Therefore, I propose to add similar to 
> {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which 
> allows to specify a port range for the {{RestServerEndpoint}} to pick a port 
> from. {{RestOptions#PORT}} would then only be used by the client to connect 
> to the started {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #7263: [FLINK-11081] Support binding port range for REST server

2018-12-11 Thread GitBox
yanghua commented on issue #7263: [FLINK-11081] Support binding port range for 
REST server
URL: https://github.com/apache/flink/pull/7263#issuecomment-446434964
 
 
   hi @walterddr Thank you for your review. I responded to them and I think 
some suggestions make sense. In addition, rest.port will not be deprecated, 
after which it will only be used by the rest client, which makes its 
responsibilities clearer (it was originally used by both the server and the 
client), see the description of this issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11081) Support binding port range for REST server

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718363#comment-16718363
 ] 

ASF GitHub Bot commented on FLINK-11081:


yanghua commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240860330
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointConfigurationTest.java
 ##
 @@ -49,17 +77,154 @@ public void testBasicMapping() throws 
ConfigurationException {
Configuration originalConfig = new Configuration();
originalConfig.setString(RestOptions.ADDRESS, ADDRESS);
originalConfig.setString(RestOptions.BIND_ADDRESS, 
BIND_ADDRESS);
-   originalConfig.setInteger(RestOptions.PORT, PORT);
+   originalConfig.setString(RestOptions.BIND_PORT, BIND_PORT);

originalConfig.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, 
CONTENT_LENGTH);
originalConfig.setString(WebOptions.TMP_DIR, 
temporaryFolder.getRoot().getAbsolutePath());
 
final RestServerEndpointConfiguration result = 
RestServerEndpointConfiguration.fromConfiguration(originalConfig);
Assert.assertEquals(ADDRESS, result.getRestAddress());
Assert.assertEquals(BIND_ADDRESS, result.getRestBindAddress());
-   Assert.assertEquals(PORT, result.getRestBindPort());
+   Assert.assertEquals(BIND_PORT, result.getRestBindPort());
Assert.assertEquals(CONTENT_LENGTH, 
result.getMaxContentLength());
Assert.assertThat(
result.getUploadDir().toAbsolutePath().toString(),

containsString(temporaryFolder.getRoot().getAbsolutePath()));
}
+
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support binding port range for REST server
> --
>
> Key: FLINK-11081
> URL: https://issues.apache.org/jira/browse/FLINK-11081
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{RestServerEndpoint}} binds to the port specified by 
> {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be 
> useful to being able to specify not only a single port but a port range to 
> pick a port from. Therefore, I propose to add similar to 
> {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which 
> allows to specify a port range for the {{RestServerEndpoint}} to pick a port 
> from. {{RestOptions#PORT}} would then only be used by the client to connect 
> to the started {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11081) Support binding port range for REST server

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718361#comment-16718361
 ] 

ASF GitHub Bot commented on FLINK-11081:


yanghua commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240860279
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
+   throw new IllegalArgumentException("Invalid 
port range definition: " + restBindPort);
+   }
+
+   int chosenPort = 0;
+   while (portsIterator.hasNext()) {
+   try {
+   chosenPort = portsIterator.next();
+   if (restBindAddress == null) {
+   channel = 
bootstrap.bind(chosenPort);
+   } else {
+   channel = 
bootstrap.bind(restBindAddress, chosenPort);
+   }
+   serverChannel = 
channel.syncUninterruptibly().channel();
+   break;
+   } catch (Exception e) {
 
 Review comment:
   Will add a test to let the first port conflict and choose the second.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support binding port range for REST server
> --
>
> Key: FLINK-11081
> URL: https://issues.apache.org/jira/browse/FLINK-11081
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{RestServerEndpoint}} binds to the port specified by 
> {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be 
> useful to being able to specify not only a single port but a port range to 
> pick a port from. Therefore, I propose to add similar to 
> {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which 
> allows to specify a port range for the {{RestServerEndpoint}} to pick a port 
> from. {{RestOptions#PORT}} would then only be used by the client to connect 
> to the started {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS

2018-12-11 Thread GitBox
klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-446434187
 
 
   @azagrebin Thank you for your explanation. 
   As your explanation, I will 
- first move `DirectExecutorService` into 
`org.apache.flink.runtime.concurrent` and change the `Executor 
Executors#directExecutor()` to `DirectExecutorService 
Executors#direcExecutorService()`, this will be done in a separate commit.
   - then use `Executors#direcExecutorSerivice()` in current patch to share the 
logic of `threadNum = 1` and `threadNum > 1`.
   
   Did I understand right? If yes, I will file a issue and implement the first 
move, and then come back to complete this patch.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server

2018-12-11 Thread GitBox
yanghua commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240860279
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
+   throw new IllegalArgumentException("Invalid 
port range definition: " + restBindPort);
+   }
+
+   int chosenPort = 0;
+   while (portsIterator.hasNext()) {
+   try {
+   chosenPort = portsIterator.next();
+   if (restBindAddress == null) {
+   channel = 
bootstrap.bind(chosenPort);
+   } else {
+   channel = 
bootstrap.bind(restBindAddress, chosenPort);
+   }
+   serverChannel = 
channel.syncUninterruptibly().channel();
+   break;
+   } catch (Exception e) {
 
 Review comment:
   Will add a test to let the first port conflict and choose the second.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server

2018-12-11 Thread GitBox
yanghua commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240860330
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointConfigurationTest.java
 ##
 @@ -49,17 +77,154 @@ public void testBasicMapping() throws 
ConfigurationException {
Configuration originalConfig = new Configuration();
originalConfig.setString(RestOptions.ADDRESS, ADDRESS);
originalConfig.setString(RestOptions.BIND_ADDRESS, 
BIND_ADDRESS);
-   originalConfig.setInteger(RestOptions.PORT, PORT);
+   originalConfig.setString(RestOptions.BIND_PORT, BIND_PORT);

originalConfig.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, 
CONTENT_LENGTH);
originalConfig.setString(WebOptions.TMP_DIR, 
temporaryFolder.getRoot().getAbsolutePath());
 
final RestServerEndpointConfiguration result = 
RestServerEndpointConfiguration.fromConfiguration(originalConfig);
Assert.assertEquals(ADDRESS, result.getRestAddress());
Assert.assertEquals(BIND_ADDRESS, result.getRestBindAddress());
-   Assert.assertEquals(PORT, result.getRestBindPort());
+   Assert.assertEquals(BIND_PORT, result.getRestBindPort());
Assert.assertEquals(CONTENT_LENGTH, 
result.getMaxContentLength());
Assert.assertThat(
result.getUploadDir().toAbsolutePath().toString(),

containsString(temporaryFolder.getRoot().getAbsolutePath()));
}
+
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11081) Support binding port range for REST server

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718357#comment-16718357
 ] 

ASF GitHub Bot commented on FLINK-11081:


yanghua commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240859949
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
+   throw new IllegalArgumentException("Invalid 
port range definition: " + restBindPort);
+   }
+
+   int chosenPort = 0;
+   while (portsIterator.hasNext()) {
+   try {
+   chosenPort = portsIterator.next();
+   if (restBindAddress == null) {
+   channel = 
bootstrap.bind(chosenPort);
+   } else {
+   channel = 
bootstrap.bind(restBindAddress, chosenPort);
+   }
+   serverChannel = 
channel.syncUninterruptibly().channel();
+   break;
+   } catch (Exception e) {
+   // we can continue to try if this 
contains a netty channel exception
+   Throwable cause = e.getCause();
+   if (!(cause instanceof 
org.jboss.netty.channel.ChannelException ||
+   cause instanceof 
java.net.BindException)) {
+   throw e;
 
 Review comment:
   Here, we just retry when the exception is a port conflict specific 
exception. If not, it seems we should throw it and let the caller know there is 
another exception. Also, see `BootstrapTools#startActorSystem`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support binding port range for REST server
> --
>
> Key: FLINK-11081
> URL: https://issues.apache.org/jira/browse/FLINK-11081
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{RestServerEndpoint}} binds to the port specified by 
> {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be 
> useful to being able to specify not only a single port but a port range to 
> pick a port from. Therefore, I propose to add similar to 
> {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which 
> allows to specify a port range for the {{RestServerEndpoint}} to pick a port 
> from. {{RestOptions#PORT}} would then only be used by the client to connect 
> to the started {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server

2018-12-11 Thread GitBox
yanghua commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240859949
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
+   throw new IllegalArgumentException("Invalid 
port range definition: " + restBindPort);
+   }
+
+   int chosenPort = 0;
+   while (portsIterator.hasNext()) {
+   try {
+   chosenPort = portsIterator.next();
+   if (restBindAddress == null) {
+   channel = 
bootstrap.bind(chosenPort);
+   } else {
+   channel = 
bootstrap.bind(restBindAddress, chosenPort);
+   }
+   serverChannel = 
channel.syncUninterruptibly().channel();
+   break;
+   } catch (Exception e) {
+   // we can continue to try if this 
contains a netty channel exception
+   Throwable cause = e.getCause();
+   if (!(cause instanceof 
org.jboss.netty.channel.ChannelException ||
+   cause instanceof 
java.net.BindException)) {
+   throw e;
 
 Review comment:
   Here, we just retry when the exception is a port conflict specific 
exception. If not, it seems we should throw it and let the caller know there is 
another exception. Also, see `BootstrapTools#startActorSystem`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11083) CRowSerializerConfigSnapshot is not instantiable

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718356#comment-16718356
 ] 

ASF GitHub Bot commented on FLINK-11083:


kisimple commented on issue #7267: [FLINK-11083][Table] 
CRowSerializerConfigSnapshot is not instantiable
URL: https://github.com/apache/flink/pull/7267#issuecomment-446432902
 
 
   cc @pnowojski @dawidwys


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> CRowSerializerConfigSnapshot is not instantiable
> 
>
> Key: FLINK-11083
> URL: https://issues.apache.org/jira/browse/FLINK-11083
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL, Type Serialization System
>Reporter: boshu Zheng
>Assignee: boshu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0, 1.7.1
>
>
> An exception was encountered when restarting a job with savepoint in our 
> production env,
> {code:java}
> 2018-12-04 20:28:25,091 INFO  10595 org.apache.flink.runtime.taskmanager.Task 
>   :917  - _OurCustomOperator_ -> select: () -> to: Tuple2 -> 
> Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING 
> to FAILED.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
> state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) 
> from any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
>   ... 5 more
> Caused by: java.lang.RuntimeException: The class 
> 'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot'
>  is not instantiable: The class has no (implicit) public nullary constructor, 
> i.e. a constructor without arguments.
>   at 
> org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412)
>   at 
> org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
>   at 
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
>   at 
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:218)
>   at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:105)
>   at 
> 

[GitHub] kisimple commented on issue #7267: [FLINK-11083][Table] CRowSerializerConfigSnapshot is not instantiable

2018-12-11 Thread GitBox
kisimple commented on issue #7267: [FLINK-11083][Table] 
CRowSerializerConfigSnapshot is not instantiable
URL: https://github.com/apache/flink/pull/7267#issuecomment-446432902
 
 
   cc @pnowojski @dawidwys


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11130) Migrate flink-table runtime triggers class

2018-12-11 Thread shenlei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shenlei reassigned FLINK-11130:
---

Assignee: shenlei

> Migrate flink-table runtime triggers class
> --
>
> Key: FLINK-11130
> URL: https://issues.apache.org/jira/browse/FLINK-11130
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: shenlei
>Assignee: shenlei
>Priority: Major
>
> As discussed  in Flink-11065 , this is a sub task of Flink-11065. This task 
> is to migrate flink-table runtime StateCleaningCountTrigger class  to module 
> flink-table-runtime



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11081) Support binding port range for REST server

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718350#comment-16718350
 ] 

ASF GitHub Bot commented on FLINK-11081:


yanghua commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240857598
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
 
 Review comment:
   `getPortRangeFromString ` also throws `IllegalConfigurationException` which 
is a unchecked exception defined in `flink-core`, so here we catch a general 
`Exception` and in the catch block then rethrow a unified 
`IllegalArgumentException `. This code style used in 
`BootstrapTools#startActorSystem`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support binding port range for REST server
> --
>
> Key: FLINK-11081
> URL: https://issues.apache.org/jira/browse/FLINK-11081
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{RestServerEndpoint}} binds to the port specified by 
> {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be 
> useful to being able to specify not only a single port but a port range to 
> pick a port from. Therefore, I propose to add similar to 
> {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which 
> allows to specify a port range for the {{RestServerEndpoint}} to pick a port 
> from. {{RestOptions#PORT}} would then only be used by the client to connect 
> to the started {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server

2018-12-11 Thread GitBox
yanghua commented on a change in pull request #7263: [FLINK-11081] Support 
binding port range for REST server
URL: https://github.com/apache/flink/pull/7263#discussion_r240857598
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 ##
 @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
 
-   log.debug("Binding rest endpoint to {}:{}.", 
restBindAddress, restBindPort);
-   final ChannelFuture channel;
-   if (restBindAddress == null) {
-   channel = bootstrap.bind(restBindPort);
-   } else {
-   channel = bootstrap.bind(restBindAddress, 
restBindPort);
+   ChannelFuture channel;
+
+   // parse port range definition and create port iterator
+   Iterator portsIterator;
+   try {
+   portsIterator = 
NetUtils.getPortRangeFromString(restBindPort);
+   } catch (Exception e) {
 
 Review comment:
   `getPortRangeFromString ` also throws `IllegalConfigurationException` which 
is a unchecked exception defined in `flink-core`, so here we catch a general 
`Exception` and in the catch block then rethrow a unified 
`IllegalArgumentException `. This code style used in 
`BootstrapTools#startActorSystem`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10566) Flink Planning is exponential in the number of stages

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718339#comment-16718339
 ] 

ASF GitHub Bot commented on FLINK-10566:


fhueske commented on a change in pull request #7276: [FLINK-10566] Fix 
exponential planning time of large programs
URL: https://github.com/apache/flink/pull/7276#discussion_r240855467
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java
 ##
 @@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.planning;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Test;
+
+import java.util.Collection;
+
+/**
+ * Tests that large programs can be compiled to a Plan in reasonable amount of 
time.
+ */
+public class LargePlanTest {
+
+   @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, 
timeout = 15_000)
+   public void testPlanningOfLargePlan() throws Exception {
+   runProgram(new PreviewPlanEnvironment(), 10, 50);
+   }
+
+   private static void runProgram(ExecutionEnvironment env, int depth, int 
width) throws Exception {
+   DataSet input = env.fromElements("a", "b", "c");
+   DataSet stats = null;
+
+   for (int i = 0; i < depth; i++) {
+   stats = analyze(input, stats, width / (i + 1) + 1);
+   }
+
+   stats.output(new DiscardingOutputFormat<>());
+   env.execute("depth " + depth + " width " + width);
+   }
+
+   private static DataSet analyze(DataSet input, 
DataSet stats, int branches) {
+   for (int i = 0; i < branches; i++) {
+   final int ii = i;
+
+   if (stats != null) {
+   input = input.map(new RichMapFunction() {
+
+   @Override
+   public void open(Configuration parameters) 
throws Exception {
+   Collection broadcastSet = 
getRuntimeContext().getBroadcastVariable("stats");
+   }
+
+   @Override
+   public String map(String value) throws 
Exception {
+   return value;
+   }
+   }).withBroadcastSet(stats.map(s -> "(" + s + 
").map"), "stats");
+   }
+
+   TupleTypeInfo> typeInfo =
+   new 
TupleTypeInfo<>(TypeInformation.of(Integer.class), 
TypeInformation.of(String.class));
+   DataSet branch = input
+   .map(s -> new Tuple2<>(0, s + 
ii)).returns(typeInfo)
+   .groupBy(0)
+   .minBy(1)
+   .map(kv -> 
kv.f1).returns(TypeInformation.of(String.class));
 
 Review comment:
   `returns(Types.STRING)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Planning is exponential in the number of stages
> -
>
> Key: FLINK-10566
> URL: https://issues.apache.org/jira/browse/FLINK-10566
> Project: Flink
>  Issue Type: Bug
> 

[GitHub] fhueske commented on a change in pull request #7276: [FLINK-10566] Fix exponential planning time of large programs

2018-12-11 Thread GitBox
fhueske commented on a change in pull request #7276: [FLINK-10566] Fix 
exponential planning time of large programs
URL: https://github.com/apache/flink/pull/7276#discussion_r240855219
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java
 ##
 @@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.planning;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Test;
+
+import java.util.Collection;
+
+/**
+ * Tests that large programs can be compiled to a Plan in reasonable amount of 
time.
+ */
+public class LargePlanTest {
+
+   @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, 
timeout = 15_000)
+   public void testPlanningOfLargePlan() throws Exception {
+   runProgram(new PreviewPlanEnvironment(), 10, 50);
+   }
+
+   private static void runProgram(ExecutionEnvironment env, int depth, int 
width) throws Exception {
+   DataSet input = env.fromElements("a", "b", "c");
+   DataSet stats = null;
+
+   for (int i = 0; i < depth; i++) {
+   stats = analyze(input, stats, width / (i + 1) + 1);
 
 Review comment:
   indent


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #7276: [FLINK-10566] Fix exponential planning time of large programs

2018-12-11 Thread GitBox
fhueske commented on a change in pull request #7276: [FLINK-10566] Fix 
exponential planning time of large programs
URL: https://github.com/apache/flink/pull/7276#discussion_r240855467
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java
 ##
 @@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.planning;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Test;
+
+import java.util.Collection;
+
+/**
+ * Tests that large programs can be compiled to a Plan in reasonable amount of 
time.
+ */
+public class LargePlanTest {
+
+   @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, 
timeout = 15_000)
+   public void testPlanningOfLargePlan() throws Exception {
+   runProgram(new PreviewPlanEnvironment(), 10, 50);
+   }
+
+   private static void runProgram(ExecutionEnvironment env, int depth, int 
width) throws Exception {
+   DataSet input = env.fromElements("a", "b", "c");
+   DataSet stats = null;
+
+   for (int i = 0; i < depth; i++) {
+   stats = analyze(input, stats, width / (i + 1) + 1);
+   }
+
+   stats.output(new DiscardingOutputFormat<>());
+   env.execute("depth " + depth + " width " + width);
+   }
+
+   private static DataSet analyze(DataSet input, 
DataSet stats, int branches) {
+   for (int i = 0; i < branches; i++) {
+   final int ii = i;
+
+   if (stats != null) {
+   input = input.map(new RichMapFunction() {
+
+   @Override
+   public void open(Configuration parameters) 
throws Exception {
+   Collection broadcastSet = 
getRuntimeContext().getBroadcastVariable("stats");
+   }
+
+   @Override
+   public String map(String value) throws 
Exception {
+   return value;
+   }
+   }).withBroadcastSet(stats.map(s -> "(" + s + 
").map"), "stats");
+   }
+
+   TupleTypeInfo> typeInfo =
+   new 
TupleTypeInfo<>(TypeInformation.of(Integer.class), 
TypeInformation.of(String.class));
+   DataSet branch = input
+   .map(s -> new Tuple2<>(0, s + 
ii)).returns(typeInfo)
+   .groupBy(0)
+   .minBy(1)
+   .map(kv -> 
kv.f1).returns(TypeInformation.of(String.class));
 
 Review comment:
   `returns(Types.STRING)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10566) Flink Planning is exponential in the number of stages

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718338#comment-16718338
 ] 

ASF GitHub Bot commented on FLINK-10566:


fhueske commented on a change in pull request #7276: [FLINK-10566] Fix 
exponential planning time of large programs
URL: https://github.com/apache/flink/pull/7276#discussion_r240855219
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java
 ##
 @@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.planning;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Test;
+
+import java.util.Collection;
+
+/**
+ * Tests that large programs can be compiled to a Plan in reasonable amount of 
time.
+ */
+public class LargePlanTest {
+
+   @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, 
timeout = 15_000)
+   public void testPlanningOfLargePlan() throws Exception {
+   runProgram(new PreviewPlanEnvironment(), 10, 50);
+   }
+
+   private static void runProgram(ExecutionEnvironment env, int depth, int 
width) throws Exception {
+   DataSet input = env.fromElements("a", "b", "c");
+   DataSet stats = null;
+
+   for (int i = 0; i < depth; i++) {
+   stats = analyze(input, stats, width / (i + 1) + 1);
 
 Review comment:
   indent


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Planning is exponential in the number of stages
> -
>
> Key: FLINK-10566
> URL: https://issues.apache.org/jira/browse/FLINK-10566
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.5.4, 1.6.1, 1.7.0
>Reporter: Robert Bradshaw
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Attachments: chart.png
>
>
> This makes it nearly impossible to run graphs with 100 or more stages. (The 
> execution itself is still sub-second, but the job submission takes 
> increasingly long.)
> I can reproduce this with the following pipeline, which resembles my 
> real-world workloads (with depth up to 10 and width up, and past, 50). On 
> Flink it seems getting width beyond width 10 is problematic (times out after 
> hours). Note the log scale on the chart for time. 
>  
> {code:java}
>   public static void runPipeline(int depth, int width) throws Exception {
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet input = env.fromElements("a", "b", "c");
> DataSet stats = null;
> for (int i = 0; i < depth; i++) {
>   stats = analyze(input, stats, width / (i + 1) + 1);
> }
> stats.writeAsText("out.txt");
> env.execute("depth " + depth + " width " + width);
>   }
>   public static DataSet analyze(DataSet input, 
> DataSet stats, int branches) {
> System.out.println("analyze " + branches);
> for (int i = 0; i < branches; i++) {
>   final int ii = i;
>   if (stats != null) {
> input = input.map(new RichMapFunction() {
> @Override
> public void open(Configuration parameters) throws Exception {
>   Collection 

[jira] [Commented] (FLINK-10566) Flink Planning is exponential in the number of stages

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718336#comment-16718336
 ] 

ASF GitHub Bot commented on FLINK-10566:


fhueske commented on a change in pull request #7276: [FLINK-10566] Fix 
exponential planning time of large programs
URL: https://github.com/apache/flink/pull/7276#discussion_r240855139
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java
 ##
 @@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.planning;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Test;
+
+import java.util.Collection;
+
+/**
+ * Tests that large programs can be compiled to a Plan in reasonable amount of 
time.
+ */
+public class LargePlanTest {
+
+   @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, 
timeout = 15_000)
+   public void testPlanningOfLargePlan() throws Exception {
+   runProgram(new PreviewPlanEnvironment(), 10, 50);
+   }
+
+   private static void runProgram(ExecutionEnvironment env, int depth, int 
width) throws Exception {
+   DataSet input = env.fromElements("a", "b", "c");
+   DataSet stats = null;
+
+   for (int i = 0; i < depth; i++) {
+   stats = analyze(input, stats, width / (i + 1) + 1);
+   }
+
+   stats.output(new DiscardingOutputFormat<>());
+   env.execute("depth " + depth + " width " + width);
+   }
+
+   private static DataSet analyze(DataSet input, 
DataSet stats, int branches) {
+   for (int i = 0; i < branches; i++) {
+   final int ii = i;
+
+   if (stats != null) {
+   input = input.map(new RichMapFunction() {
+
+   @Override
+   public void open(Configuration parameters) 
throws Exception {
+   Collection broadcastSet = 
getRuntimeContext().getBroadcastVariable("stats");
+   }
+
+   @Override
+   public String map(String value) throws 
Exception {
+   return value;
+   }
+   }).withBroadcastSet(stats.map(s -> "(" + s + 
").map"), "stats");
+   }
+
+   TupleTypeInfo> typeInfo =
+   new 
TupleTypeInfo<>(TypeInformation.of(Integer.class), 
TypeInformation.of(String.class));
+   DataSet branch = input
+   .map(s -> new Tuple2<>(0, s + 
ii)).returns(typeInfo)
 
 Review comment:
   can be simplified to `.returns(Types.TUPLE(Types.STRING, Types.INT))`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Planning is exponential in the number of stages
> -
>
> Key: FLINK-10566
> URL: https://issues.apache.org/jira/browse/FLINK-10566
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.5.4, 1.6.1, 1.7.0
>Reporter: Robert Bradshaw
>Assignee: 

[GitHub] fhueske commented on a change in pull request #7276: [FLINK-10566] Fix exponential planning time of large programs

2018-12-11 Thread GitBox
fhueske commented on a change in pull request #7276: [FLINK-10566] Fix 
exponential planning time of large programs
URL: https://github.com/apache/flink/pull/7276#discussion_r240855139
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java
 ##
 @@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.planning;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Test;
+
+import java.util.Collection;
+
+/**
+ * Tests that large programs can be compiled to a Plan in reasonable amount of 
time.
+ */
+public class LargePlanTest {
+
+   @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, 
timeout = 15_000)
+   public void testPlanningOfLargePlan() throws Exception {
+   runProgram(new PreviewPlanEnvironment(), 10, 50);
+   }
+
+   private static void runProgram(ExecutionEnvironment env, int depth, int 
width) throws Exception {
+   DataSet input = env.fromElements("a", "b", "c");
+   DataSet stats = null;
+
+   for (int i = 0; i < depth; i++) {
+   stats = analyze(input, stats, width / (i + 1) + 1);
+   }
+
+   stats.output(new DiscardingOutputFormat<>());
+   env.execute("depth " + depth + " width " + width);
+   }
+
+   private static DataSet analyze(DataSet input, 
DataSet stats, int branches) {
+   for (int i = 0; i < branches; i++) {
+   final int ii = i;
+
+   if (stats != null) {
+   input = input.map(new RichMapFunction() {
+
+   @Override
+   public void open(Configuration parameters) 
throws Exception {
+   Collection broadcastSet = 
getRuntimeContext().getBroadcastVariable("stats");
+   }
+
+   @Override
+   public String map(String value) throws 
Exception {
+   return value;
+   }
+   }).withBroadcastSet(stats.map(s -> "(" + s + 
").map"), "stats");
+   }
+
+   TupleTypeInfo> typeInfo =
+   new 
TupleTypeInfo<>(TypeInformation.of(Integer.class), 
TypeInformation.of(String.class));
+   DataSet branch = input
+   .map(s -> new Tuple2<>(0, s + 
ii)).returns(typeInfo)
 
 Review comment:
   can be simplified to `.returns(Types.TUPLE(Types.STRING, Types.INT))`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   >