[jira] [Updated] (FLINK-8739) Optimize runtime support for distinct filter
[ https://issues.apache.org/jira/browse/FLINK-8739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-8739: -- Labels: pull-request-available (was: ) > Optimize runtime support for distinct filter > > > Key: FLINK-8739 > URL: https://issues.apache.org/jira/browse/FLINK-8739 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Rong Rong >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > Possible optimizaitons: > 1. Decouple distinct map and actual accumulator so that they can separately > be created in codegen. > 2. Reuse same distinct accumulator for filtering, e.g. `SELECT > COUNT(DISTINCT(a)), SUM(DISTINCT(a))` should reuse the same distinct map. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dianfu opened a new pull request #7286: [FLINK-8739] [table] Optimize DISTINCE aggregates to use the same distinct accumulator if possible
dianfu opened a new pull request #7286: [FLINK-8739] [table] Optimize DISTINCE aggregates to use the same distinct accumulator if possible URL: https://github.com/apache/flink/pull/7286 ## What is the purpose of the change *This pull request optimizes the DISTINCT aggregates such as `SELECT COUNT(DISTINCT(a)), SUM(DISTINCT(a))` to use the same distinct accumulator for performance optimization.* ## Brief change log - *Separate DistinctAccumulator and the actual accumulator* - *Optimize the AggregateCodeGeneration to reuse the same DistinctAccumulator if possible* ## Verifying this change This change is already covered by existing tests, such as *SqlITCase#testDistinctAggOnRowTimeTumbleWindow, AggregateITCase, OverWindowITCase*. Still missing: For the harness test, I will add more harness tests base on #7253 ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8739) Optimize runtime support for distinct filter
[ https://issues.apache.org/jira/browse/FLINK-8739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718569#comment-16718569 ] ASF GitHub Bot commented on FLINK-8739: --- dianfu opened a new pull request #7286: [FLINK-8739] [table] Optimize DISTINCE aggregates to use the same distinct accumulator if possible URL: https://github.com/apache/flink/pull/7286 ## What is the purpose of the change *This pull request optimizes the DISTINCT aggregates such as `SELECT COUNT(DISTINCT(a)), SUM(DISTINCT(a))` to use the same distinct accumulator for performance optimization.* ## Brief change log - *Separate DistinctAccumulator and the actual accumulator* - *Optimize the AggregateCodeGeneration to reuse the same DistinctAccumulator if possible* ## Verifying this change This change is already covered by existing tests, such as *SqlITCase#testDistinctAggOnRowTimeTumbleWindow, AggregateITCase, OverWindowITCase*. Still missing: For the harness test, I will add more harness tests base on #7253 ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Optimize runtime support for distinct filter > > > Key: FLINK-8739 > URL: https://issues.apache.org/jira/browse/FLINK-8739 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Rong Rong >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > Possible optimizaitons: > 1. Decouple distinct map and actual accumulator so that they can separately > be created in codegen. > 2. Reuse same distinct accumulator for filtering, e.g. `SELECT > COUNT(DISTINCT(a)), SUM(DISTINCT(a))` should reuse the same distinct map. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8838) Add Support for UNNEST a MultiSet type field
[ https://issues.apache.org/jira/browse/FLINK-8838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln.lee closed FLINK-8838. -- > Add Support for UNNEST a MultiSet type field > > > Key: FLINK-8838 > URL: https://issues.apache.org/jira/browse/FLINK-8838 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Major > Fix For: 1.6.0 > > > MultiSetTypeInfo was introduced by FLINK-7491, and UNNEST support Array type > only, so it would be nice to support UNNEST a MultiSet type field. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11138) RocksDB's wal seems to be useless in rocksdb-state-backend that reduce the disk's overhead
cailiuyang created FLINK-11138: -- Summary: RocksDB's wal seems to be useless in rocksdb-state-backend that reduce the disk's overhead Key: FLINK-11138 URL: https://issues.apache.org/jira/browse/FLINK-11138 Project: Flink Issue Type: Improvement Reporter: cailiuyang The wal of rocksdb is used to recovery after crash, but after flink-job crash, it always recovery from last checkpoint, so it seems wal is useless. Am i right? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-5511) Add support for outer joins with local predicates
[ https://issues.apache.org/jira/browse/FLINK-5511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln.lee closed FLINK-5511. -- > Add support for outer joins with local predicates > - > > Key: FLINK-5511 > URL: https://issues.apache.org/jira/browse/FLINK-5511 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > currently the test case in > flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala > will throw a ValidationException indicating: “Invalid non-join predicate 'b < > 3. For non-join predicates use Table#where.” > {code:title=JoinITCase.scala} > @Test(expected = classOf[ValidationException]) > def testNoJoinCondition(): Unit = { > … > val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, > 'c) > val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, > 'f, 'g, 'h) > val joinT = ds2.leftOuterJoin(ds1, 'b === 'd && 'b < 3).select('c, 'g) > } > {code} > This jira aims to supported this kind of local predicates in outer joins. > More detailed description: http://goo.gl/gK6vP3 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11137) Unexpected RegistrationTimeoutException of TaskExecutor
Biao Liu created FLINK-11137: Summary: Unexpected RegistrationTimeoutException of TaskExecutor Key: FLINK-11137 URL: https://issues.apache.org/jira/browse/FLINK-11137 Project: Flink Issue Type: Bug Components: TaskManager Affects Versions: 1.7.0 Reporter: Biao Liu Assignee: Biao Liu There is a race condition in {{TaskExecutor}} between starting registering to RM and checking registration timeout. Currently we start RM leader retriever first, and then start registration timeout checking. If registration is fast enough, there is a possibility that registration is finished before starting checking registration timeout. The timeout checking will fail later. There is a stack trace of exception below: {quote}2018-11-05 14:16:52,464 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor - Fatal error occurred in TaskExecutor akka.tcp://flink@/user/taskmanager_0. org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: Could not register at the ResourceManager within the specified maximum registration duration 30 ms. This indicates a problem with this instance. Terminating now. at org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1110) at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$4(TaskExecutor.java:1096) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11081) Support binding port range for REST server
[ https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718562#comment-16718562 ] ASF GitHub Bot commented on FLINK-11081: yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240905696 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { Review comment: OK, thank you for your clarification. It seems that I misunderstood what you mean. I thought you meant to capture Exception and could not catch IllegalConfigurationException. Now it seems that what you mean is that we should break down the capture of a particular exception. Considering that `NetUtils.getPortRangeFromString` will throw at least two possible exceptions, one checked and one unchecked. I think we capture Exception directly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support binding port range for REST server > -- > > Key: FLINK-11081 > URL: https://issues.apache.org/jira/browse/FLINK-11081 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.7.0, 1.8.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Currently the {{RestServerEndpoint}} binds to the port specified by > {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be > useful to being able to specify not only a single port but a port range to > pick a port from. Therefore, I propose to add similar to > {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which > allows to specify a port range for the {{RestServerEndpoint}} to pick a port > from. {{RestOptions#PORT}} would then only be used by the client to connect > to the started {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server
yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240905696 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { Review comment: OK, thank you for your clarification. It seems that I misunderstood what you mean. I thought you meant to capture Exception and could not catch IllegalConfigurationException. Now it seems that what you mean is that we should break down the capture of a particular exception. Considering that `NetUtils.getPortRangeFromString` will throw at least two possible exceptions, one checked and one unchecked. I think we capture Exception directly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun opened a new pull request #7285: [hotfix] [resource manager] Remove legacy(unused) class ResourceManag…
TisonKun opened a new pull request #7285: [hotfix] [resource manager] Remove legacy(unused) class ResourceManag… URL: https://github.com/apache/flink/pull/7285 …erServices ## What is the purpose of the change We introduced `ResourceManagerService` in 9d1b5fb7ab2b1dcb327d026d367bbd7dc3e66f94 As code evolves, its functions are covered by `ResourceActions`(for `allocateResource`) and `FencedRpcEndpoint`(by `getFencingToken`, `getMainThreadExecutor` and `runAsync*`). Now there is no code depending on `ResourceManagerService` and its functions are superseded. We can safely remove it as a clean-up. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**) - The serializers: (**no**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**) - The S3 file system connector: (**no**) ## Documentation - Does this pull request introduce a new feature? (**no**) - If yes, how is the feature documented? (**not applicable**) cc @zentol @StefanRRichter This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11081) Support binding port range for REST server
[ https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718528#comment-16718528 ] ASF GitHub Bot commented on FLINK-11081: walterddr commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240900279 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { Review comment: What I meant is "should we catch `RuntimeExceptions`". function signature of `startActorSystem` explicitly throws a generic `Exception` so there's no way to distinguish in `BootstrapTool` use case. But here since we are directly using `NetUtil.getPor...`, we might be able to do a better job. Regarding whether we should catch it. I am fine with either decision as long as they are properly documented. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support binding port range for REST server > -- > > Key: FLINK-11081 > URL: https://issues.apache.org/jira/browse/FLINK-11081 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.7.0, 1.8.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Currently the {{RestServerEndpoint}} binds to the port specified by > {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be > useful to being able to specify not only a single port but a port range to > pick a port from. Therefore, I propose to add similar to > {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which > allows to specify a port range for the {{RestServerEndpoint}} to pick a port > from. {{RestOptions#PORT}} would then only be used by the client to connect > to the started {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] walterddr commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server
walterddr commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240900279 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { Review comment: What I meant is "should we catch `RuntimeExceptions`". function signature of `startActorSystem` explicitly throws a generic `Exception` so there's no way to distinguish in `BootstrapTool` use case. But here since we are directly using `NetUtil.getPor...`, we might be able to do a better job. Regarding whether we should catch it. I am fine with either decision as long as they are properly documented. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11067) Port TableEnvironments to Java
[ https://issues.apache.org/jira/browse/FLINK-11067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718513#comment-16718513 ] sunjincheng commented on FLINK-11067: - We list the two ways users use BatchTableEnvironment, as follows: 1.Using api.scala.BatchTableEnvironment {code:java} import org.apache.flink.table.api.scala.BatchTableEnvironment BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); {code} 2. api.java.BatchTableEnvironment BatchTableEnvironment tEnv = {code:java} import org.apache.flink.table.api.java.BatchTableEnvironment BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); {code} According to current design, `flink-tabe-api-java/flink-tabe-api-scala` dependent the `flink-tabe-api-base`, then the `api.java.BatchTableEnvironment` and `api.scala.BatchTableEnvironment` can not be import in `flink-tabe-api-base`,So I also want to know [~dian.fu]'s question that which package does `BatchTableEnvironment` belong to, in `getTableEnvironment()` definition? {code:java} static BatchTableEnvironment getTableEnvironment(BatchEnvironment env); {code} Bests, Jincheng > Port TableEnvironments to Java > -- > > Key: FLINK-11067 > URL: https://issues.apache.org/jira/browse/FLINK-11067 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Dawid Wysakowicz >Priority: Major > > This task includes porting {{TableEnvironment}}, {{StreamTableEnvironment}}, > {{BatchTableEnvironment}} to Java. API-breaking changes need to be avoided > and discussed. Some refactoring and clean up might be necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11107) [state] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured
[ https://issues.apache.org/jira/browse/FLINK-11107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718512#comment-16718512 ] ASF GitHub Bot commented on FLINK-11107: Myasuka edited a comment on issue #7281: [FLINK-11107][state] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured URL: https://github.com/apache/flink/pull/7281#issuecomment-446297350 @StephanEwen , since you have left annotations below: ~~~ to keep supporting the old behavior where default (JobManager) Backend + HA mode = checkpoints in HA store we add the HA persistence dir as the checkpoint directory if none other is set ~~~ However, I'm wondering whether this keeps the same behavior as before. For Flink-1.3, (JobManager) Backend + HA mode = only create `completedCheckpoint` file under HA folder. On the other side, for Flink-1.6, this would create another `job-id/chk-x/_metadata` except the completedCheckpoint file. Please correct me if I am wrong. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > [state] Avoid memory stateBackend to create arbitrary folders under HA path > when no checkpoint path configured > -- > > Key: FLINK-11107 > URL: https://issues.apache.org/jira/browse/FLINK-11107 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.6.2, 1.7.0 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.6.3, 1.7.1 > > > Currently, memory state-backend would create a folder named with random UUID > under HA directory if no checkpoint path ever configured. (the code logic > locates within {{StateBackendLoader#fromApplicationOrConfigOrDefault}}) > However, the default memory state-backend would not only be created on JM > side, but also on each task manager's side, which means many folders with > random UUID would be created under HA directory. It would result in exception > like: > {noformat} > The directory item limit of /tmp/flink/ha is exceeded: limit=1048576 > items=1048576{noformat} > If this happens, no new jobs could be submitted only if we clean up those > directories manually. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Myasuka edited a comment on issue #7281: [FLINK-11107][state] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured
Myasuka edited a comment on issue #7281: [FLINK-11107][state] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured URL: https://github.com/apache/flink/pull/7281#issuecomment-446297350 @StephanEwen , since you have left annotations below: ~~~ to keep supporting the old behavior where default (JobManager) Backend + HA mode = checkpoints in HA store we add the HA persistence dir as the checkpoint directory if none other is set ~~~ However, I'm wondering whether this keeps the same behavior as before. For Flink-1.3, (JobManager) Backend + HA mode = only create `completedCheckpoint` file under HA folder. On the other side, for Flink-1.6, this would create another `job-id/chk-x/_metadata` except the completedCheckpoint file. Please correct me if I am wrong. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11107) [state] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured
[ https://issues.apache.org/jira/browse/FLINK-11107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718511#comment-16718511 ] ASF GitHub Bot commented on FLINK-11107: Myasuka commented on issue #7281: [FLINK-11107][state] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured URL: https://github.com/apache/flink/pull/7281#issuecomment-446478434 BTW, If we do not create UUID directories for memory state-backend in this situation, job could still restore from high-availability storage. The only difference is the information of `Latest Restore` under `Checkpoints` tab of web UI would show the path is ``. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > [state] Avoid memory stateBackend to create arbitrary folders under HA path > when no checkpoint path configured > -- > > Key: FLINK-11107 > URL: https://issues.apache.org/jira/browse/FLINK-11107 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.6.2, 1.7.0 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Fix For: 1.6.3, 1.7.1 > > > Currently, memory state-backend would create a folder named with random UUID > under HA directory if no checkpoint path ever configured. (the code logic > locates within {{StateBackendLoader#fromApplicationOrConfigOrDefault}}) > However, the default memory state-backend would not only be created on JM > side, but also on each task manager's side, which means many folders with > random UUID would be created under HA directory. It would result in exception > like: > {noformat} > The directory item limit of /tmp/flink/ha is exceeded: limit=1048576 > items=1048576{noformat} > If this happens, no new jobs could be submitted only if we clean up those > directories manually. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Myasuka commented on issue #7281: [FLINK-11107][state] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured
Myasuka commented on issue #7281: [FLINK-11107][state] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured URL: https://github.com/apache/flink/pull/7281#issuecomment-446478434 BTW, If we do not create UUID directories for memory state-backend in this situation, job could still restore from high-availability storage. The only difference is the information of `Latest Restore` under `Checkpoints` tab of web UI would show the path is ``. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-4810) Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-4810: --- Assignee: vinoyang > Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful > checkpoints > > > Key: FLINK-4810 > URL: https://issues.apache.org/jira/browse/FLINK-4810 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The Checkpoint coordinator should track the number of consecutive > unsuccessful checkpoints. > If more than {{n}} (configured value) checkpoints fail in a row, it should > call {{fail()}} on the execution graph to trigger a recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] ramkrish86 commented on issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints
ramkrish86 commented on issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints URL: https://github.com/apache/flink/pull/3334#issuecomment-446471220 Closing the PR as per request. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-4810) Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718490#comment-16718490 ] ASF GitHub Bot commented on FLINK-4810: --- ramkrish86 commented on issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints URL: https://github.com/apache/flink/pull/3334#issuecomment-446471220 Closing the PR as per request. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful > checkpoints > > > Key: FLINK-4810 > URL: https://issues.apache.org/jira/browse/FLINK-4810 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Priority: Major > Labels: pull-request-available > > The Checkpoint coordinator should track the number of consecutive > unsuccessful checkpoints. > If more than {{n}} (configured value) checkpoints fail in a row, it should > call {{fail()}} on the execution graph to trigger a recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] ramkrish86 closed pull request #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints
ramkrish86 closed pull request #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints URL: https://github.com/apache/flink/pull/3334 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 0592e3d9aea..9f453d0f2c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -132,6 +132,8 @@ /** The maximum number of checkpoints that may be in progress at the same time */ private final int maxConcurrentCheckpointAttempts; + /** The maximum number of unsuccessful checkpoints */ + private final int maxFailedCheckpoints; /** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */ private final Timer timer; @@ -142,6 +144,9 @@ /** The number of consecutive failed trigger attempts */ private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0); + /** The number of consecutive failed checkpoints */ + private final AtomicInteger numFailedCheckpoints = new AtomicInteger(0); + private ScheduledTrigger currentPeriodicTrigger; /** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed */ @@ -163,6 +168,23 @@ private CheckpointStatsTracker statsTracker; // + public CheckpointCoordinator( + JobID job, + long baseInterval, + long checkpointTimeout, + long minPauseBetweenCheckpoints, + int maxConcurrentCheckpointAttempts, + ExternalizedCheckpointSettings externalizeSettings, + ExecutionVertex[] tasksToTrigger, + ExecutionVertex[] tasksToWaitFor, + ExecutionVertex[] tasksToCommitTo, + CheckpointIDCounter checkpointIDCounter, + CompletedCheckpointStore completedCheckpointStore, + String checkpointDirectory, + Executor executor) { + this(job, baseInterval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpointAttempts, 0, externalizeSettings, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, + checkpointIDCounter, completedCheckpointStore, checkpointDirectory, executor); + } public CheckpointCoordinator( JobID job, @@ -170,6 +192,7 @@ public CheckpointCoordinator( long checkpointTimeout, long minPauseBetweenCheckpoints, int maxConcurrentCheckpointAttempts, + int maxFailedCheckpoints, ExternalizedCheckpointSettings externalizeSettings, ExecutionVertex[] tasksToTrigger, ExecutionVertex[] tasksToWaitFor, @@ -184,6 +207,7 @@ public CheckpointCoordinator( checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero"); checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0"); checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1"); + checkArgument(maxFailedCheckpoints >= 0, "maxFailedCheckpoints must be >= 0"); if (externalizeSettings.externalizeCheckpoints() && checkpointDirectory == null) { throw new IllegalStateException("CheckpointConfig says to persist periodic " + @@ -207,6 +231,7 @@ public CheckpointCoordinator( this.checkpointTimeout = checkpointTimeout; this.minPauseBetweenCheckpointsNanos = minPauseBetweenCheckpoints * 1_000_000; this.maxConcurrentCheckpointAttempts = maxConcurrentCheckpointAttempts; + this.maxFailedCheckpoints = maxFailedCheckpoints; this.tasksToTrigger = checkNotNull(tasksToTrigger); this.tasksToWaitFor = checkNotNull(tasksToWaitFor); this.tasksToCommitTo = checkNotNull(tasksToCommitTo); @@ -461,6 +486,9 @@ CheckpointTriggerResult triggerCheckpoint( catch (Throwable t) { int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet(); LOG.warn("Failed to
[jira] [Commented] (FLINK-4810) Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718491#comment-16718491 ] ASF GitHub Bot commented on FLINK-4810: --- ramkrish86 closed pull request #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints URL: https://github.com/apache/flink/pull/3334 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 0592e3d9aea..9f453d0f2c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -132,6 +132,8 @@ /** The maximum number of checkpoints that may be in progress at the same time */ private final int maxConcurrentCheckpointAttempts; + /** The maximum number of unsuccessful checkpoints */ + private final int maxFailedCheckpoints; /** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */ private final Timer timer; @@ -142,6 +144,9 @@ /** The number of consecutive failed trigger attempts */ private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0); + /** The number of consecutive failed checkpoints */ + private final AtomicInteger numFailedCheckpoints = new AtomicInteger(0); + private ScheduledTrigger currentPeriodicTrigger; /** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed */ @@ -163,6 +168,23 @@ private CheckpointStatsTracker statsTracker; // + public CheckpointCoordinator( + JobID job, + long baseInterval, + long checkpointTimeout, + long minPauseBetweenCheckpoints, + int maxConcurrentCheckpointAttempts, + ExternalizedCheckpointSettings externalizeSettings, + ExecutionVertex[] tasksToTrigger, + ExecutionVertex[] tasksToWaitFor, + ExecutionVertex[] tasksToCommitTo, + CheckpointIDCounter checkpointIDCounter, + CompletedCheckpointStore completedCheckpointStore, + String checkpointDirectory, + Executor executor) { + this(job, baseInterval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpointAttempts, 0, externalizeSettings, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, + checkpointIDCounter, completedCheckpointStore, checkpointDirectory, executor); + } public CheckpointCoordinator( JobID job, @@ -170,6 +192,7 @@ public CheckpointCoordinator( long checkpointTimeout, long minPauseBetweenCheckpoints, int maxConcurrentCheckpointAttempts, + int maxFailedCheckpoints, ExternalizedCheckpointSettings externalizeSettings, ExecutionVertex[] tasksToTrigger, ExecutionVertex[] tasksToWaitFor, @@ -184,6 +207,7 @@ public CheckpointCoordinator( checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero"); checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0"); checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1"); + checkArgument(maxFailedCheckpoints >= 0, "maxFailedCheckpoints must be >= 0"); if (externalizeSettings.externalizeCheckpoints() && checkpointDirectory == null) { throw new IllegalStateException("CheckpointConfig says to persist periodic " + @@ -207,6 +231,7 @@ public CheckpointCoordinator( this.checkpointTimeout = checkpointTimeout; this.minPauseBetweenCheckpointsNanos = minPauseBetweenCheckpoints * 1_000_000; this.maxConcurrentCheckpointAttempts = maxConcurrentCheckpointAttempts; + this.maxFailedCheckpoints = maxFailedCheckpoints; this.tasksToTrigger = checkNotNull(tasksToTrigger); this.tasksToWaitFor = checkNotNull(tasksToWaitFor); this.tasksToCommitTo = checkNotNull(tasksToCommitTo); @@ -461,6 +486,9 @@
[GitHub] lamber-ken removed a comment on issue #7279: [hotfix] add hadoop user name to yarn logs command
lamber-ken removed a comment on issue #7279: [hotfix] add hadoop user name to yarn logs command URL: https://github.com/apache/flink/pull/7279#issuecomment-446261251 hi, @zentol, cc :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore
[ https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718478#comment-16718478 ] ASF GitHub Bot commented on FLINK-11048: tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r240885619 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,33 +173,66 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + /** +* Set savepoint restore settings that will be used when executing the job. +* @param savepointRestoreSettings savepoint restore settings +*/ + public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) { + this.savepointRestoreSettings = savepointRestoreSettings; Review comment: Other than the job name all other execution parameters come from the "environment" already. To me this is not a big deal as long as we don't introduce even more clutter than there already is. Added another execute method instead. PTAL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Ability to programmatically execute streaming pipeline with savepoint restore > > --- > > Key: FLINK-11048 > URL: https://issues.apache.org/jira/browse/FLINK-11048 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.7.0 >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Minor > Labels: pull-request-available > > RemoteStreamEnvironment.execute doesn't support restore from savepoint, > though the underlying ClusterClient does. Add an explicit "execute remotely" > that can be used by downstream projects. > [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r240885619 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,33 +173,66 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + /** +* Set savepoint restore settings that will be used when executing the job. +* @param savepointRestoreSettings savepoint restore settings +*/ + public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) { + this.savepointRestoreSettings = savepointRestoreSettings; Review comment: Other than the job name all other execution parameters come from the "environment" already. To me this is not a big deal as long as we don't introduce even more clutter than there already is. Added another execute method instead. PTAL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11081) Support binding port range for REST server
[ https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718466#comment-16718466 ] ASF GitHub Bot commented on FLINK-11081: yanghua commented on issue #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#issuecomment-446460550 `ConfigOption.PORT` ? I can not find it. I guess what you want to express is `RestOptions.PORT`. I think that it can't be deprecated here. Initially its responsibilities are both the binding port of the server and the connection port of the client. Now the server's binding port is represented by `BIND_PORT`, then it can also be used to represent the client's connection port. In some places it may be used: ``` Private void createClientConfiguration(URI restAddress) { Configuration restClientConfig = new Configuration(); restClientConfig.setString(JobManagerOptions.ADDRESS, restAddress.getHost()); restClientConfig.setInteger(RestOptions.PORT, restAddress.getPort()); this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig); } ``` There should be two pairs of configuration items `BIND_ADDRESS/BIND_PORT` and `ADDRESS/PORT` here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support binding port range for REST server > -- > > Key: FLINK-11081 > URL: https://issues.apache.org/jira/browse/FLINK-11081 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.7.0, 1.8.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Currently the {{RestServerEndpoint}} binds to the port specified by > {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be > useful to being able to specify not only a single port but a port range to > pick a port from. Therefore, I propose to add similar to > {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which > allows to specify a port range for the {{RestServerEndpoint}} to pick a port > from. {{RestOptions#PORT}} would then only be used by the client to connect > to the started {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11081) Support binding port range for REST server
[ https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718467#comment-16718467 ] ASF GitHub Bot commented on FLINK-11081: yanghua edited a comment on issue #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#issuecomment-446460550 `ConfigOption.PORT` ? I can not find it. I guess what you want to express is `RestOptions.PORT`. I think that it can't be deprecated here. Initially its responsibilities are both the binding port of the server and the connection port of the client. Now the server's binding port is represented by `BIND_PORT`, then it can also be used to represent the client's connection port. In some places it may be used: ```java Private void createClientConfiguration(URI restAddress) { Configuration restClientConfig = new Configuration(); restClientConfig.setString(JobManagerOptions.ADDRESS, restAddress.getHost()); restClientConfig.setInteger(RestOptions.PORT, restAddress.getPort()); this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig); } ``` There should be two pairs of configuration items `BIND_ADDRESS/BIND_PORT` and `ADDRESS/PORT` here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support binding port range for REST server > -- > > Key: FLINK-11081 > URL: https://issues.apache.org/jira/browse/FLINK-11081 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.7.0, 1.8.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Currently the {{RestServerEndpoint}} binds to the port specified by > {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be > useful to being able to specify not only a single port but a port range to > pick a port from. Therefore, I propose to add similar to > {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which > allows to specify a port range for the {{RestServerEndpoint}} to pick a port > from. {{RestOptions#PORT}} would then only be used by the client to connect > to the started {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua edited a comment on issue #7263: [FLINK-11081] Support binding port range for REST server
yanghua edited a comment on issue #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#issuecomment-446460550 `ConfigOption.PORT` ? I can not find it. I guess what you want to express is `RestOptions.PORT`. I think that it can't be deprecated here. Initially its responsibilities are both the binding port of the server and the connection port of the client. Now the server's binding port is represented by `BIND_PORT`, then it can also be used to represent the client's connection port. In some places it may be used: ```java Private void createClientConfiguration(URI restAddress) { Configuration restClientConfig = new Configuration(); restClientConfig.setString(JobManagerOptions.ADDRESS, restAddress.getHost()); restClientConfig.setInteger(RestOptions.PORT, restAddress.getPort()); this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig); } ``` There should be two pairs of configuration items `BIND_ADDRESS/BIND_PORT` and `ADDRESS/PORT` here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on issue #7263: [FLINK-11081] Support binding port range for REST server
yanghua commented on issue #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#issuecomment-446460550 `ConfigOption.PORT` ? I can not find it. I guess what you want to express is `RestOptions.PORT`. I think that it can't be deprecated here. Initially its responsibilities are both the binding port of the server and the connection port of the client. Now the server's binding port is represented by `BIND_PORT`, then it can also be used to represent the client's connection port. In some places it may be used: ``` Private void createClientConfiguration(URI restAddress) { Configuration restClientConfig = new Configuration(); restClientConfig.setString(JobManagerOptions.ADDRESS, restAddress.getHost()); restClientConfig.setInteger(RestOptions.PORT, restAddress.getPort()); this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig); } ``` There should be two pairs of configuration items `BIND_ADDRESS/BIND_PORT` and `ADDRESS/PORT` here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11081) Support binding port range for REST server
[ https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718464#comment-16718464 ] ASF GitHub Bot commented on FLINK-11081: yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240880017 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + restBindPort); + } + + int chosenPort = 0; + while (portsIterator.hasNext()) { + try { + chosenPort = portsIterator.next(); + if (restBindAddress == null) { + channel = bootstrap.bind(chosenPort); + } else { + channel = bootstrap.bind(restBindAddress, chosenPort); + } + serverChannel = channel.syncUninterruptibly().channel(); + break; + } catch (Exception e) { + // we can continue to try if this contains a netty channel exception + Throwable cause = e.getCause(); + if (!(cause instanceof org.jboss.netty.channel.ChannelException || + cause instanceof java.net.BindException)) { + throw e; Review comment: Yes, this expression may be misunderstood, I will update it later. Regarding the exception handling logic here, in the recent test, it is really a problem, we should not call it : ``` Throwable cause = e.getCause(); ``` It will get a `null` value. I will fix it later. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support binding port range for REST server > -- > > Key: FLINK-11081 > URL: https://issues.apache.org/jira/browse/FLINK-11081 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.7.0, 1.8.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Currently the {{RestServerEndpoint}} binds to the port specified by > {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be > useful to being able to specify not only a single port but a port range to > pick a port from. Therefore, I propose to add similar to > {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which > allows to specify a port range for the {{RestServerEndpoint}} to pick a port > from. {{RestOptions#PORT}} would then only be used by the client to connect > to the started {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11081) Support binding port range for REST server
[ https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718463#comment-16718463 ] ASF GitHub Bot commented on FLINK-11081: yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240880017 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + restBindPort); + } + + int chosenPort = 0; + while (portsIterator.hasNext()) { + try { + chosenPort = portsIterator.next(); + if (restBindAddress == null) { + channel = bootstrap.bind(chosenPort); + } else { + channel = bootstrap.bind(restBindAddress, chosenPort); + } + serverChannel = channel.syncUninterruptibly().channel(); + break; + } catch (Exception e) { + // we can continue to try if this contains a netty channel exception + Throwable cause = e.getCause(); + if (!(cause instanceof org.jboss.netty.channel.ChannelException || + cause instanceof java.net.BindException)) { + throw e; Review comment: Yes, this expression may be misunderstood, I will update it later. Regarding the exception handling logic here, in the recent test, it is really a problem, we should not call it again. ``` Throwable cause = e.getCause(); ``` It will get a `null` value. I will fix it later. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support binding port range for REST server > -- > > Key: FLINK-11081 > URL: https://issues.apache.org/jira/browse/FLINK-11081 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.7.0, 1.8.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Currently the {{RestServerEndpoint}} binds to the port specified by > {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be > useful to being able to specify not only a single port but a port range to > pick a port from. Therefore, I propose to add similar to > {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which > allows to specify a port range for the {{RestServerEndpoint}} to pick a port > from. {{RestOptions#PORT}} would then only be used by the client to connect > to the started {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11081) Support binding port range for REST server
[ https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718462#comment-16718462 ] ASF GitHub Bot commented on FLINK-11081: yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240880004 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { Review comment: Here, we catch `Exception` so that regardless of `NumberFormatException` or `IllegalConfigurationException` will be caught, isn't it? Or did I misunderstand what you mean? Do you want me to catch a more specific exception? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support binding port range for REST server > -- > > Key: FLINK-11081 > URL: https://issues.apache.org/jira/browse/FLINK-11081 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.7.0, 1.8.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Currently the {{RestServerEndpoint}} binds to the port specified by > {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be > useful to being able to specify not only a single port but a port range to > pick a port from. Therefore, I propose to add similar to > {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which > allows to specify a port range for the {{RestServerEndpoint}} to pick a port > from. {{RestOptions#PORT}} would then only be used by the client to connect > to the started {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server
yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240880017 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + restBindPort); + } + + int chosenPort = 0; + while (portsIterator.hasNext()) { + try { + chosenPort = portsIterator.next(); + if (restBindAddress == null) { + channel = bootstrap.bind(chosenPort); + } else { + channel = bootstrap.bind(restBindAddress, chosenPort); + } + serverChannel = channel.syncUninterruptibly().channel(); + break; + } catch (Exception e) { + // we can continue to try if this contains a netty channel exception + Throwable cause = e.getCause(); + if (!(cause instanceof org.jboss.netty.channel.ChannelException || + cause instanceof java.net.BindException)) { + throw e; Review comment: Yes, this expression may be misunderstood, I will update it later. Regarding the exception handling logic here, in the recent test, it is really a problem, we should not call it : ``` Throwable cause = e.getCause(); ``` It will get a `null` value. I will fix it later. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server
yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240880017 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + restBindPort); + } + + int chosenPort = 0; + while (portsIterator.hasNext()) { + try { + chosenPort = portsIterator.next(); + if (restBindAddress == null) { + channel = bootstrap.bind(chosenPort); + } else { + channel = bootstrap.bind(restBindAddress, chosenPort); + } + serverChannel = channel.syncUninterruptibly().channel(); + break; + } catch (Exception e) { + // we can continue to try if this contains a netty channel exception + Throwable cause = e.getCause(); + if (!(cause instanceof org.jboss.netty.channel.ChannelException || + cause instanceof java.net.BindException)) { + throw e; Review comment: Yes, this expression may be misunderstood, I will update it later. Regarding the exception handling logic here, in the recent test, it is really a problem, we should not call it again. ``` Throwable cause = e.getCause(); ``` It will get a `null` value. I will fix it later. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server
yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240880004 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { Review comment: Here, we catch `Exception` so that regardless of `NumberFormatException` or `IllegalConfigurationException` will be caught, isn't it? Or did I misunderstand what you mean? Do you want me to catch a more specific exception? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718447#comment-16718447 ] ASF GitHub Bot commented on FLINK-10976: hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877554 ## File path: docs/dev/table/tableApi.md ## @@ -1682,6 +1682,36 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov {% top %} +### Aggregate + +Aggregate performs an aggregate operation with an aggregate function. You have to close the +"aggregate" with a select statement. The output will be flattened if the output type is a Review comment: Add a sentence that can't use an aggregate function in the select statement as it has been checked in the select. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Aggregate operator to Table API > --- > > Key: FLINK-10976 > URL: https://issues.apache.org/jira/browse/FLINK-10976 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add Aggregate operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab > .groupBy('a) // leave out groupBy-clause to define global aggregates > .agg(fun: AggregateFunction) // output has columns 'a, 'b, 'c > .select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()
[ https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718457#comment-16718457 ] ASF GitHub Bot commented on FLINK-11010: walterddr commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-446457485 > @walterddr could you please check the timezone problem when dealing with eventtime? > it seems that lamber-ken's commit only fixed proc time. > Would you please refer to my previous comment to take a look at my eventtime usecase. thanks a lot! > --update > i found the correct timestamp was changed in OutputRowtimeProcessFunction, using SqlFunctions.internalToTimestamp(), i'm not sure whether its redundant or not. > please fix this problem. thx FLINK-11010 only reports bugs for utilizing `currentProcessingTime()`. maybe we can mark the JIRA as duplicate of FLINK-8353 and focus on addressing the bigger overall timezone problem instead. what do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink SQL timestamp is inconsistent with currentProcessingTime() > > > Key: FLINK-11010 > URL: https://issues.apache.org/jira/browse/FLINK-11010 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.2, 1.7.0, 1.8.0, 1.7.1 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > > Flink SQL timestamp is inconsistent with currentProcessingTime(). > > the ProcessingTime is just implemented by invoking System.currentTimeMillis() > but the long value will be automatically wrapped to a Timestamp with the > following statement: > `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] walterddr commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()
walterddr commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-446457485 > @walterddr could you please check the timezone problem when dealing with eventtime? > it seems that lamber-ken's commit only fixed proc time. > Would you please refer to my previous comment to take a look at my eventtime usecase. thanks a lot! > --update > i found the correct timestamp was changed in OutputRowtimeProcessFunction, using SqlFunctions.internalToTimestamp(), i'm not sure whether its redundant or not. > please fix this problem. thx FLINK-11010 only reports bugs for utilizing `currentProcessingTime()`. maybe we can mark the JIRA as duplicate of FLINK-8353 and focus on addressing the bigger overall timezone problem instead. what do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11081) Support binding port range for REST server
[ https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718456#comment-16718456 ] ASF GitHub Bot commented on FLINK-11081: walterddr commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240877630 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { Review comment: hmm. `IllegalConfigurationException` extends `RuntimeException`, shouldn't we not catch that? I am wondering reason `BootstrapTool` handles it that way was because the static `startActorSystem` method throws general exception in method signature. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support binding port range for REST server > -- > > Key: FLINK-11081 > URL: https://issues.apache.org/jira/browse/FLINK-11081 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.7.0, 1.8.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Currently the {{RestServerEndpoint}} binds to the port specified by > {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be > useful to being able to specify not only a single port but a port range to > pick a port from. Therefore, I propose to add similar to > {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which > allows to specify a port range for the {{RestServerEndpoint}} to pick a port > from. {{RestOptions#PORT}} would then only be used by the client to connect > to the started {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11081) Support binding port range for REST server
[ https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718455#comment-16718455 ] ASF GitHub Bot commented on FLINK-11081: walterddr commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240877872 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + restBindPort); + } + + int chosenPort = 0; + while (portsIterator.hasNext()) { + try { + chosenPort = portsIterator.next(); + if (restBindAddress == null) { + channel = bootstrap.bind(chosenPort); + } else { + channel = bootstrap.bind(restBindAddress, chosenPort); + } + serverChannel = channel.syncUninterruptibly().channel(); + break; + } catch (Exception e) { + // we can continue to try if this contains a netty channel exception + Throwable cause = e.getCause(); + if (!(cause instanceof org.jboss.netty.channel.ChannelException || + cause instanceof java.net.BindException)) { + throw e; Review comment: you are right. yeah I think both place the comment is a bit misleading. My take from the comment is: "we can continue to retry the same port within this while loop" instead of the actual meaning: "we 'could allow' users to continue try when we throw an exception to the user" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support binding port range for REST server > -- > > Key: FLINK-11081 > URL: https://issues.apache.org/jira/browse/FLINK-11081 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.7.0, 1.8.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Currently the {{RestServerEndpoint}} binds to the port specified by > {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be > useful to being able to specify not only a single port but a port range to > pick a port from. Therefore, I propose to add similar to > {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which > allows to specify a port range for the {{RestServerEndpoint}} to pick a port > from. {{RestOptions#PORT}} would then only be used by the client to connect > to the started {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] walterddr commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server
walterddr commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240877872 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + restBindPort); + } + + int chosenPort = 0; + while (portsIterator.hasNext()) { + try { + chosenPort = portsIterator.next(); + if (restBindAddress == null) { + channel = bootstrap.bind(chosenPort); + } else { + channel = bootstrap.bind(restBindAddress, chosenPort); + } + serverChannel = channel.syncUninterruptibly().channel(); + break; + } catch (Exception e) { + // we can continue to try if this contains a netty channel exception + Throwable cause = e.getCause(); + if (!(cause instanceof org.jboss.netty.channel.ChannelException || + cause instanceof java.net.BindException)) { + throw e; Review comment: you are right. yeah I think both place the comment is a bit misleading. My take from the comment is: "we can continue to retry the same port within this while loop" instead of the actual meaning: "we 'could allow' users to continue try when we throw an exception to the user" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server
walterddr commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240877630 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { Review comment: hmm. `IllegalConfigurationException` extends `RuntimeException`, shouldn't we not catch that? I am wondering reason `BootstrapTool` handles it that way was because the static `startActorSystem` method throws general exception in method signature. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718450#comment-16718450 ] ASF GitHub Bot commented on FLINK-10976: hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877583 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -999,6 +999,47 @@ class Table( new OverWindowedTable(this, overWindows.toArray) } + /** +* Performs a global aggregate operation with an aggregate function. Use this before a selection +* to perform the selection operation. The output will be flattened if the output type is a +* composite type. +* +* Example: +* +* {{{ +* val aggFunc: AggregateFunction[_, _] = new MyAggregateFunction +* tableEnv.registerFunction("aggFunc", aggFunc); +* table.aggregate("aggFunc(a, b) as (f0, f1, f2)").select('f0, 'f1) +* }}} +*/ + def aggregate(aggregateFunction: Expression): AggregatedTable = { +unwrap(aggregateFunction, tableEnv) match { + case _: Aggregation => + case _ => throw new ValidationException("Only AggregateFunction can be used in aggregate.") +} + +new AggregatedTable(this, Nil, aggregateFunction) + } + + /** +* Performs a global aggregate operation with an aggregate function. Use this before a selection +* to perform the selection operation. The output will be flattened if the output type is a +* composite type. +* +* Example: +* +* {{{ +* val aggFunc: AggregateFunction[_, _] = new MyAggregateFunction +* tableEnv.registerFunction("aggFunc", aggFunc); +* table.aggregate("aggFunc(a, b) as (f0, f1, f2)").select("f0, f1") +* }}} +*/ + def aggregate(aggregateFunction: String): AggregatedTable = { +val withResolvedAggFunctionCall = replaceAggFunctionCall( Review comment: Replace with `groupBy().aggregate(aggregateFunction)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Aggregate operator to Table API > --- > > Key: FLINK-10976 > URL: https://issues.apache.org/jira/browse/FLINK-10976 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add Aggregate operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab > .groupBy('a) // leave out groupBy-clause to define global aggregates > .agg(fun: AggregateFunction) // output has columns 'a, 'b, 'c > .select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718446#comment-16718446 ] ASF GitHub Bot commented on FLINK-10976: hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877612 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala ## @@ -418,4 +419,34 @@ object ProjectionTranslator { case e: Expression => e } } + + /** +* Unwrap a Call to ScalarFunctionCall, TableFunctionCall, AggFunctionCall or the built-in +* expression representation. +* +* @param fieldthe expression to unwrap +* @param tableEnv the TableEnvironment +* @return the unwrapped expression +*/ + def unwrap( + field: Expression, + tableEnv: TableEnvironment): Expression = { +field match { + case Alias(child, _, _) => +unwrap(child, tableEnv) + // Functions calls + case c @ Call(name, args: Seq[Expression]) => +val function = tableEnv.functionCatalog.lookupFunction(name, args) +unwrap(function, tableEnv) + // Other expressions + case e: Expression => e +} + } + + def extractFieldNames(expr: Expression): Seq[String] = { +expr match { + case Alias(child, name, extraNames) => Seq(name) ++ extraNames Review comment: replace child with _ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Aggregate operator to Table API > --- > > Key: FLINK-10976 > URL: https://issues.apache.org/jira/browse/FLINK-10976 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add Aggregate operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab > .groupBy('a) // leave out groupBy-clause to define global aggregates > .agg(fun: AggregateFunction) // output has columns 'a, 'b, 'c > .select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718453#comment-16718453 ] ASF GitHub Bot commented on FLINK-10976: hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877640 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala ## @@ -128,4 +128,71 @@ class AggregateStringExpressionTest extends TableTestBase { verifyTableEquals(resJava, resScala) } + + @Test + def testNonGroupedTableAggregate(): Unit = { +val util = streamTestUtil() +val t = util.addTable[(Int, Long, String)]('a, 'b, 'c) + +val testAgg = new CountMinMax +util.tableEnv.registerFunction("testAgg", testAgg) + +// Expression / Scala API +val resScala = t + .aggregate(testAgg('a)) + .select('f0, 'f1) + +// String / Java API +val resJava = t + .aggregate("testAgg(a)") + .select("f0, f1") + +verifyTableEquals(resScala, resJava) + } + + @Test + def testTableAggregate(): Unit = { Review comment: How about renaming it to `testAggregate`. To distinguish from the todo `flatAggregate`, `TableAggregateFunction`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Aggregate operator to Table API > --- > > Key: FLINK-10976 > URL: https://issues.apache.org/jira/browse/FLINK-10976 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add Aggregate operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab > .groupBy('a) // leave out groupBy-clause to define global aggregates > .agg(fun: AggregateFunction) // output has columns 'a, 'b, 'c > .select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718452#comment-16718452 ] ASF GitHub Bot commented on FLINK-10976: hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877626 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/AggregateValidationTest.scala ## @@ -208,4 +208,58 @@ class AggregateValidationTest extends TableTestBase { // must fail. UDAGG does not accept String type .select("myWeightedAvg(c, a)") } + Review comment: Add a test to validate aggregate with two aggregate functions? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Aggregate operator to Table API > --- > > Key: FLINK-10976 > URL: https://issues.apache.org/jira/browse/FLINK-10976 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add Aggregate operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab > .groupBy('a) // leave out groupBy-clause to define global aggregates > .agg(fun: AggregateFunction) // output has columns 'a, 'b, 'c > .select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718449#comment-16718449 ] ASF GitHub Bot commented on FLINK-10976: hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877569 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -999,6 +999,47 @@ class Table( new OverWindowedTable(this, overWindows.toArray) } + /** +* Performs a global aggregate operation with an aggregate function. Use this before a selection +* to perform the selection operation. The output will be flattened if the output type is a +* composite type. +* +* Example: +* +* {{{ +* val aggFunc: AggregateFunction[_, _] = new MyAggregateFunction +* tableEnv.registerFunction("aggFunc", aggFunc); +* table.aggregate("aggFunc(a, b) as (f0, f1, f2)").select('f0, 'f1) +* }}} +*/ + def aggregate(aggregateFunction: Expression): AggregatedTable = { +unwrap(aggregateFunction, tableEnv) match { Review comment: Line 1016:1021 is duplicated with line 1139:1144. How about change Line 1016:1021 to: `groupBy().aggregate(aggregateFunction)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Aggregate operator to Table API > --- > > Key: FLINK-10976 > URL: https://issues.apache.org/jira/browse/FLINK-10976 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add Aggregate operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab > .groupBy('a) // leave out groupBy-clause to define global aggregates > .agg(fun: AggregateFunction) // output has columns 'a, 'b, 'c > .select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877547 ## File path: docs/dev/table/tableApi.md ## @@ -1682,6 +1682,36 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov {% top %} +### Aggregate Review comment: Users may be confused to see `Aggregations` and `Aggregate`. Should we divide the current `Operations` chapter into two sub ones? - SQL base Operations - Non SQL base Operations And we can put all map/flatmap/aggregate/flataggregate into the `Non SQL base Operations` chapter. It is more clear. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718451#comment-16718451 ] ASF GitHub Bot commented on FLINK-10976: hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877558 ## File path: docs/dev/table/tableApi.md ## @@ -1682,6 +1682,36 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov {% top %} +### Aggregate + +Aggregate performs an aggregate operation with an aggregate function. You have to close the +"aggregate" with a select statement. The output will be flattened if the output type is a +composite type. + + + +{% highlight java %} +tableEnv.registerFunction("myAggFunc", new MyAggregateFunction()); Review comment: How about info user that `MyAggregateFunction` is an `AggregateFunction` by define a variable: `AggregateFunction myAggFunc = new MyAggregateFunction()`. Same as the scala example. We will have Table aggregate functions later. I think it would be nice if we distinguish them. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Aggregate operator to Table API > --- > > Key: FLINK-10976 > URL: https://issues.apache.org/jira/browse/FLINK-10976 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add Aggregate operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab > .groupBy('a) // leave out groupBy-clause to define global aggregates > .agg(fun: AggregateFunction) // output has columns 'a, 'b, 'c > .select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718448#comment-16718448 ] ASF GitHub Bot commented on FLINK-10976: hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877602 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -1226,3 +1308,76 @@ class WindowGroupedTable( select(withResolvedAggFunctionCall: _*) } } + +class AggregatedTable( +private[flink] val table: Table, +private[flink] val groupKeys: Seq[Expression], +private[flink] val aggregateFunction: Expression) { + + /** +* Performs a selection operation after an aggregate operation. The field expressions +* cannot contain table functions and aggregations. +* +* Example: +* +* {{{ +* val aggFunc: AggregateFunction[_, _] = new MyAggregateFunction +* tableEnv.registerFunction("aggFunc", aggFunc); +* table.groupBy('key).aggregate("aggFunc(a, b) as (f0, f1, f2)").select('key, 'f0, 'f1) +* }}} +*/ + def select(fields: Expression*): Table = { +val tableEnv = table.tableEnv +val (aggNames, propNames) = extractAggregationsAndProperties(Seq(aggregateFunction), tableEnv) +val projectsOnAgg = replaceAggregationsAndProperties( + groupKeys ++ Seq(aggregateFunction), tableEnv, aggNames, propNames) +val projectFields = extractFieldReferences(groupKeys ++ Seq(aggregateFunction)) + +val aggTable = new Table(tableEnv, + Project(projectsOnAgg, +Aggregate(groupKeys, aggNames.map(a => Alias(a._1, a._2)).toSeq, + Project(projectFields, table.logicalPlan).validate(tableEnv) +).validate(tableEnv) + ).validate(tableEnv)) + +// expand the aggregate results +val projectsOnAggTable = + aggTable.logicalPlan.output.take(groupKeys.length) ++ + expandProjectList( +Seq(Flattening(aggTable.logicalPlan.output.last)), +aggTable.logicalPlan, +tableEnv).zip(extractFieldNames(aggregateFunction)).map(a => Alias(a._1, a._2)) + +val expandedFields = expandProjectList(fields, aggTable.logicalPlan, tableEnv) Review comment: This `expandedFields` is used by the last `Project` bellow, so we can't expand fields from the agg Table. Instead, we should expand fields from the flattened Table after aggregate. The error can be reproduced by `select("*")` after aggregate. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Aggregate operator to Table API > --- > > Key: FLINK-10976 > URL: https://issues.apache.org/jira/browse/FLINK-10976 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add Aggregate operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab > .groupBy('a) // leave out groupBy-clause to define global aggregates > .agg(fun: AggregateFunction) // output has columns 'a, 'b, 'c > .select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718445#comment-16718445 ] ASF GitHub Bot commented on FLINK-10976: hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877547 ## File path: docs/dev/table/tableApi.md ## @@ -1682,6 +1682,36 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov {% top %} +### Aggregate Review comment: Users may be confused to see `Aggregations` and `Aggregate`. Should we divide the current `Operations` chapter into tow sub ones? - SQL base Operations - Non SQL base Operations And we can put all map/flatmap/aggregate/flataggregate into the `Non SQL base Operations` chapter. It is more clear. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Aggregate operator to Table API > --- > > Key: FLINK-10976 > URL: https://issues.apache.org/jira/browse/FLINK-10976 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add Aggregate operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab > .groupBy('a) // leave out groupBy-clause to define global aggregates > .agg(fun: AggregateFunction) // output has columns 'a, 'b, 'c > .select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10976) Add Aggregate operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718454#comment-16718454 ] ASF GitHub Bot commented on FLINK-10976: hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877547 ## File path: docs/dev/table/tableApi.md ## @@ -1682,6 +1682,36 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov {% top %} +### Aggregate Review comment: Users may be confused to see `Aggregations` and `Aggregate`. Should we divide the current `Operations` chapter into two sub ones? - SQL base Operations - Non SQL base Operations And we can put all map/flatmap/aggregate/flataggregate into the `Non SQL base Operations` chapter. It is more clear. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Aggregate operator to Table API > --- > > Key: FLINK-10976 > URL: https://issues.apache.org/jira/browse/FLINK-10976 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: sunjincheng >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Add Aggregate operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab > .groupBy('a) // leave out groupBy-clause to define global aggregates > .agg(fun: AggregateFunction) // output has columns 'a, 'b, 'c > .select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877583 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -999,6 +999,47 @@ class Table( new OverWindowedTable(this, overWindows.toArray) } + /** +* Performs a global aggregate operation with an aggregate function. Use this before a selection +* to perform the selection operation. The output will be flattened if the output type is a +* composite type. +* +* Example: +* +* {{{ +* val aggFunc: AggregateFunction[_, _] = new MyAggregateFunction +* tableEnv.registerFunction("aggFunc", aggFunc); +* table.aggregate("aggFunc(a, b) as (f0, f1, f2)").select('f0, 'f1) +* }}} +*/ + def aggregate(aggregateFunction: Expression): AggregatedTable = { +unwrap(aggregateFunction, tableEnv) match { + case _: Aggregation => + case _ => throw new ValidationException("Only AggregateFunction can be used in aggregate.") +} + +new AggregatedTable(this, Nil, aggregateFunction) + } + + /** +* Performs a global aggregate operation with an aggregate function. Use this before a selection +* to perform the selection operation. The output will be flattened if the output type is a +* composite type. +* +* Example: +* +* {{{ +* val aggFunc: AggregateFunction[_, _] = new MyAggregateFunction +* tableEnv.registerFunction("aggFunc", aggFunc); +* table.aggregate("aggFunc(a, b) as (f0, f1, f2)").select("f0, f1") +* }}} +*/ + def aggregate(aggregateFunction: String): AggregatedTable = { +val withResolvedAggFunctionCall = replaceAggFunctionCall( Review comment: Replace with `groupBy().aggregate(aggregateFunction)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877602 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -1226,3 +1308,76 @@ class WindowGroupedTable( select(withResolvedAggFunctionCall: _*) } } + +class AggregatedTable( +private[flink] val table: Table, +private[flink] val groupKeys: Seq[Expression], +private[flink] val aggregateFunction: Expression) { + + /** +* Performs a selection operation after an aggregate operation. The field expressions +* cannot contain table functions and aggregations. +* +* Example: +* +* {{{ +* val aggFunc: AggregateFunction[_, _] = new MyAggregateFunction +* tableEnv.registerFunction("aggFunc", aggFunc); +* table.groupBy('key).aggregate("aggFunc(a, b) as (f0, f1, f2)").select('key, 'f0, 'f1) +* }}} +*/ + def select(fields: Expression*): Table = { +val tableEnv = table.tableEnv +val (aggNames, propNames) = extractAggregationsAndProperties(Seq(aggregateFunction), tableEnv) +val projectsOnAgg = replaceAggregationsAndProperties( + groupKeys ++ Seq(aggregateFunction), tableEnv, aggNames, propNames) +val projectFields = extractFieldReferences(groupKeys ++ Seq(aggregateFunction)) + +val aggTable = new Table(tableEnv, + Project(projectsOnAgg, +Aggregate(groupKeys, aggNames.map(a => Alias(a._1, a._2)).toSeq, + Project(projectFields, table.logicalPlan).validate(tableEnv) +).validate(tableEnv) + ).validate(tableEnv)) + +// expand the aggregate results +val projectsOnAggTable = + aggTable.logicalPlan.output.take(groupKeys.length) ++ + expandProjectList( +Seq(Flattening(aggTable.logicalPlan.output.last)), +aggTable.logicalPlan, +tableEnv).zip(extractFieldNames(aggregateFunction)).map(a => Alias(a._1, a._2)) + +val expandedFields = expandProjectList(fields, aggTable.logicalPlan, tableEnv) Review comment: This `expandedFields` is used by the last `Project` bellow, so we can't expand fields from the agg Table. Instead, we should expand fields from the flattened Table after aggregate. The error can be reproduced by `select("*")` after aggregate. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877626 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/AggregateValidationTest.scala ## @@ -208,4 +208,58 @@ class AggregateValidationTest extends TableTestBase { // must fail. UDAGG does not accept String type .select("myWeightedAvg(c, a)") } + Review comment: Add a test to validate aggregate with two aggregate functions? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877612 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala ## @@ -418,4 +419,34 @@ object ProjectionTranslator { case e: Expression => e } } + + /** +* Unwrap a Call to ScalarFunctionCall, TableFunctionCall, AggFunctionCall or the built-in +* expression representation. +* +* @param fieldthe expression to unwrap +* @param tableEnv the TableEnvironment +* @return the unwrapped expression +*/ + def unwrap( + field: Expression, + tableEnv: TableEnvironment): Expression = { +field match { + case Alias(child, _, _) => +unwrap(child, tableEnv) + // Functions calls + case c @ Call(name, args: Seq[Expression]) => +val function = tableEnv.functionCatalog.lookupFunction(name, args) +unwrap(function, tableEnv) + // Other expressions + case e: Expression => e +} + } + + def extractFieldNames(expr: Expression): Seq[String] = { +expr match { + case Alias(child, name, extraNames) => Seq(name) ++ extraNames Review comment: replace child with _ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877554 ## File path: docs/dev/table/tableApi.md ## @@ -1682,6 +1682,36 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov {% top %} +### Aggregate + +Aggregate performs an aggregate operation with an aggregate function. You have to close the +"aggregate" with a select statement. The output will be flattened if the output type is a Review comment: Add a sentence that can't use an aggregate function in the select statement as it has been checked in the select. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877558 ## File path: docs/dev/table/tableApi.md ## @@ -1682,6 +1682,36 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov {% top %} +### Aggregate + +Aggregate performs an aggregate operation with an aggregate function. You have to close the +"aggregate" with a select statement. The output will be flattened if the output type is a +composite type. + + + +{% highlight java %} +tableEnv.registerFunction("myAggFunc", new MyAggregateFunction()); Review comment: How about info user that `MyAggregateFunction` is an `AggregateFunction` by define a variable: `AggregateFunction myAggFunc = new MyAggregateFunction()`. Same as the scala example. We will have Table aggregate functions later. I think it would be nice if we distinguish them. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877547 ## File path: docs/dev/table/tableApi.md ## @@ -1682,6 +1682,36 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov {% top %} +### Aggregate Review comment: Users may be confused to see `Aggregations` and `Aggregate`. Should we divide the current `Operations` chapter into tow sub ones? - SQL base Operations - Non SQL base Operations And we can put all map/flatmap/aggregate/flataggregate into the `Non SQL base Operations` chapter. It is more clear. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877569 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -999,6 +999,47 @@ class Table( new OverWindowedTable(this, overWindows.toArray) } + /** +* Performs a global aggregate operation with an aggregate function. Use this before a selection +* to perform the selection operation. The output will be flattened if the output type is a +* composite type. +* +* Example: +* +* {{{ +* val aggFunc: AggregateFunction[_, _] = new MyAggregateFunction +* tableEnv.registerFunction("aggFunc", aggFunc); +* table.aggregate("aggFunc(a, b) as (f0, f1, f2)").select('f0, 'f1) +* }}} +*/ + def aggregate(aggregateFunction: Expression): AggregatedTable = { +unwrap(aggregateFunction, tableEnv) match { Review comment: Line 1016:1021 is duplicated with line 1139:1144. How about change Line 1016:1021 to: `groupBy().aggregate(aggregateFunction)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API
hequn8128 commented on a change in pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235#discussion_r240877640 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala ## @@ -128,4 +128,71 @@ class AggregateStringExpressionTest extends TableTestBase { verifyTableEquals(resJava, resScala) } + + @Test + def testNonGroupedTableAggregate(): Unit = { +val util = streamTestUtil() +val t = util.addTable[(Int, Long, String)]('a, 'b, 'c) + +val testAgg = new CountMinMax +util.tableEnv.registerFunction("testAgg", testAgg) + +// Expression / Scala API +val resScala = t + .aggregate(testAgg('a)) + .select('f0, 'f1) + +// String / Java API +val resJava = t + .aggregate("testAgg(a)") + .select("f0, f1") + +verifyTableEquals(resScala, resJava) + } + + @Test + def testTableAggregate(): Unit = { Review comment: How about renaming it to `testAggregate`. To distinguish from the todo `flatAggregate`, `TableAggregateFunction`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] samsai edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()
samsai edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-446445016 @walterddr could you please check the timezone problem when dealing with eventtime? it seems that lamber-ken's commit only fixed proc time. Would you please refer to my previous comment to take a look at my eventtime usecase. thanks a lot! --update i found the correct timestamp was changed in OutputRowtimeProcessFunction, using SqlFunctions.internalToTimestamp(), i'm not sure whether its redundant or not. please fix this problem. thx This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()
[ https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718434#comment-16718434 ] ASF GitHub Bot commented on FLINK-11010: samsai edited a comment on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-446445016 @walterddr could you please check the timezone problem when dealing with eventtime? it seems that lamber-ken's commit only fixed proc time. Would you please refer to my previous comment to take a look at my eventtime usecase. thanks a lot! --update i found the correct timestamp was changed in OutputRowtimeProcessFunction, using SqlFunctions.internalToTimestamp(), i'm not sure whether its redundant or not. please fix this problem. thx This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink SQL timestamp is inconsistent with currentProcessingTime() > > > Key: FLINK-11010 > URL: https://issues.apache.org/jira/browse/FLINK-11010 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.2, 1.7.0, 1.8.0, 1.7.1 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > > Flink SQL timestamp is inconsistent with currentProcessingTime(). > > the ProcessingTime is just implemented by invoking System.currentTimeMillis() > but the long value will be automatically wrapped to a Timestamp with the > following statement: > `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11136) Fix the logical of merge for DISTINCT aggregates
[ https://issues.apache.org/jira/browse/FLINK-11136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718430#comment-16718430 ] ASF GitHub Bot commented on FLINK-11136: dianfu opened a new pull request #7284: [FLINK-11136] [table] Fix the merge logic of DISTINCT aggregates URL: https://github.com/apache/flink/pull/7284 ## What is the purpose of the change *This pull request fix the merge logic of DISTINCT aggregates.* ## Brief change log - *Fix the codegen logic in AggregationCodeGenerator to extract the distinct fields for merge* ## Verifying this change This change added tests and can be verified as follows: - *Updated integration tests SqlITCase#testDistinctAggWithMergeOnEventTimeSessionGroupWindow* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix the logical of merge for DISTINCT aggregates > > > Key: FLINK-11136 > URL: https://issues.apache.org/jira/browse/FLINK-11136 > Project: Flink > Issue Type: Test > Components: Table API SQL >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > The logic of merge for DISTINCT aggregates has bug. For the following query: > {code:java} > SELECT > c, > COUNT(DISTINCT b), > SUM(DISTINCT b), > SESSION_END(rowtime, INTERVAL '0.005' SECOND) > FROM MyTable > GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c{code} > the following exception will be thrown: > {code:java} > Caused by: java.lang.ClassCastException: org.apache.flink.types.Row cannot be > cast to java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101) > at scala.math.Numeric$IntIsIntegral$.plus(Numeric.scala:58) > at > org.apache.flink.table.functions.aggfunctions.SumAggFunction.accumulate(SumAggFunction.scala:50) > at GroupingWindowAggregateHelper$18.mergeAccumulatorsPair(Unknown Source) > at > org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:66) > at > org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:33) > at > org.apache.flink.runtime.state.heap.HeapAggregatingState.mergeState(HeapAggregatingState.java:117) > at > org.apache.flink.runtime.state.heap.AbstractHeapMergingState$MergeTransformation.apply(AbstractHeapMergingState.java:102) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:463) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341) > at > org.apache.flink.runtime.state.heap.AbstractHeapMergingState.mergeNamespaces(AbstractHeapMergingState.java:91) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:341) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311) > at > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11136) Fix the logical of merge for DISTINCT aggregates
[ https://issues.apache.org/jira/browse/FLINK-11136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11136: --- Labels: pull-request-available (was: ) > Fix the logical of merge for DISTINCT aggregates > > > Key: FLINK-11136 > URL: https://issues.apache.org/jira/browse/FLINK-11136 > Project: Flink > Issue Type: Test > Components: Table API SQL >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > The logic of merge for DISTINCT aggregates has bug. For the following query: > {code:java} > SELECT > c, > COUNT(DISTINCT b), > SUM(DISTINCT b), > SESSION_END(rowtime, INTERVAL '0.005' SECOND) > FROM MyTable > GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c{code} > the following exception will be thrown: > {code:java} > Caused by: java.lang.ClassCastException: org.apache.flink.types.Row cannot be > cast to java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101) > at scala.math.Numeric$IntIsIntegral$.plus(Numeric.scala:58) > at > org.apache.flink.table.functions.aggfunctions.SumAggFunction.accumulate(SumAggFunction.scala:50) > at GroupingWindowAggregateHelper$18.mergeAccumulatorsPair(Unknown Source) > at > org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:66) > at > org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:33) > at > org.apache.flink.runtime.state.heap.HeapAggregatingState.mergeState(HeapAggregatingState.java:117) > at > org.apache.flink.runtime.state.heap.AbstractHeapMergingState$MergeTransformation.apply(AbstractHeapMergingState.java:102) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:463) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341) > at > org.apache.flink.runtime.state.heap.AbstractHeapMergingState.mergeNamespaces(AbstractHeapMergingState.java:91) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:341) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311) > at > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11037) Introduce another greedy mechanism for distributing floating buffers
[ https://issues.apache.org/jira/browse/FLINK-11037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718422#comment-16718422 ] zhijiang edited comment on FLINK-11037 at 12/12/18 3:38 AM: [~StephanEwen], thanks for giving above proposals in point. :) * Network stack is very sensitive for performance. We should consider overall fairness as you mentioned for avoiding backpressure in some extent. * I think your proposal is a proper way to forward this feature, just like we give a flag to keep both credit and non-credit modes currently. * The adaptive algorithm sounds ideal but may behave unexpected in reality. So I agree with your point of forwarding simple determined way first, then we can forward step by step based on further experiments. Actually I implemented the extreme greedy algorithm as the first version in Alibaba for credit-based, and it experienced the challenges in double 11 and run for half an year in production. During contributing the whole feature to community, I changed to fair algorithm and refactored our private branch to keep same with community. In fact we have not compared the specific performances for these two algorithms in different scenarios, then it is hard to say which one is better. But I can think of two scenarios to verify greedy algorithm better in theory. If the floating buffers in enough for all the input channels, that means no matter with fair or greedy algorithm, all the channels can always get all the required floating buffers from pool as a result. Another data skew case is that only one input channel receives amount of data and other input channels almost empty. In these two cases the performance should be better in greedy way because we only need one request from pool to fetch all the floating buffers instead of looping many times to fetch one buffer each time, which would enter synchronized lock on pool side. was (Author: zjwang): [~StephanEwen], thanks for giving above proposals in point. :) * Network stack is very sensitive for performance. We should consider overall fairness as you mentioned for avoiding backpressure in some extent. * I think your proposal is a proper way to forward this feature, just like we give a flag to keep both credit and non-credit modes currently. * The adaptive algorithm sounds ideal but may behave unexpected in reality. So I agree with your point of forwarding simple determined way first, then we can forward step by step based on further experiments. Actually I implemented the extreme greedy algorithm as the first version in Alibaba for credit-based, and it experienced the challenges in double 11 and run for half an year in production. During contributing the whole feature to community, I changed to fair algorithm and refactored our private branch to keep same with community. In fact we have not compared the specific performances for these two algorithms in different scenarios, then it is hard to say which one is better. But I can think of one scenario to verify greedy algorithm better in theory. If the floating buffers in enough for all the input channels, that means no matter with fair or greedy algorithm, all the channels can always get all the required floating buffers from pool as a result. In this case the performance should be better in greedy way because we only need one request from pool to fetch all the floating buffers instead of looping many times to fetch one buffer each time, which would enter synchronized lock on pool side among different channels. > Introduce another greedy mechanism for distributing floating buffers > > > Key: FLINK-11037 > URL: https://issues.apache.org/jira/browse/FLINK-11037 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > The current mechanism for distributing floating buffers is fair for all the > listeners. In detail, each input channel can only request one floating buffer > each time although this channel may actually need more floating buffers. Then > this channel has to loop to request floating buffer until all are satisfied > or pool is exhausted. > In generally speaking, this way seems fair for all the concurrent channels > invoked by netty nio thread. But every request from LocalBufferPool needs to > syn lock and it is hard to say how to distribute all the available floating > buffers behaves better in real scenarios. > Therefore we propose another greedy mechanism to request more floating > buffers each time. In extreme case, we can even request all the required > buffers at a time or partial ones via configured parameters. On the other > side, LocalBufferPool can also
[GitHub] dianfu opened a new pull request #7284: [FLINK-11136] [table] Fix the merge logic of DISTINCT aggregates
dianfu opened a new pull request #7284: [FLINK-11136] [table] Fix the merge logic of DISTINCT aggregates URL: https://github.com/apache/flink/pull/7284 ## What is the purpose of the change *This pull request fix the merge logic of DISTINCT aggregates.* ## Brief change log - *Fix the codegen logic in AggregationCodeGenerator to extract the distinct fields for merge* ## Verifying this change This change added tests and can be verified as follows: - *Updated integration tests SqlITCase#testDistinctAggWithMergeOnEventTimeSessionGroupWindow* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11136) Fix the logical of merge for DISTINCT aggregates
Dian Fu created FLINK-11136: --- Summary: Fix the logical of merge for DISTINCT aggregates Key: FLINK-11136 URL: https://issues.apache.org/jira/browse/FLINK-11136 Project: Flink Issue Type: Test Components: Table API SQL Reporter: Dian Fu Assignee: Dian Fu The logic of merge for DISTINCT aggregates has bug. For the following query: {code:java} SELECT c, COUNT(DISTINCT b), SUM(DISTINCT b), SESSION_END(rowtime, INTERVAL '0.005' SECOND) FROM MyTable GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c{code} the following exception will be thrown: {code:java} Caused by: java.lang.ClassCastException: org.apache.flink.types.Row cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101) at scala.math.Numeric$IntIsIntegral$.plus(Numeric.scala:58) at org.apache.flink.table.functions.aggfunctions.SumAggFunction.accumulate(SumAggFunction.scala:50) at GroupingWindowAggregateHelper$18.mergeAccumulatorsPair(Unknown Source) at org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:66) at org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:33) at org.apache.flink.runtime.state.heap.HeapAggregatingState.mergeState(HeapAggregatingState.java:117) at org.apache.flink.runtime.state.heap.AbstractHeapMergingState$MergeTransformation.apply(AbstractHeapMergingState.java:102) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:463) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341) at org.apache.flink.runtime.state.heap.AbstractHeapMergingState.mergeNamespaces(AbstractHeapMergingState.java:91) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:341) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311) at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10817) Upgrade presto dependency to support path-style access
[ https://issues.apache.org/jira/browse/FLINK-10817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718425#comment-16718425 ] ASF GitHub Bot commented on FLINK-10817: adamlamar commented on issue #7058: [FLINK-10817] Upgrade presto dependency to support path-style access URL: https://github.com/apache/flink/pull/7058#issuecomment-446449939 @zentol @aljoscha With FLINK-10987 merged, is this ready for another review? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Upgrade presto dependency to support path-style access > -- > > Key: FLINK-10817 > URL: https://issues.apache.org/jira/browse/FLINK-10817 > Project: Flink > Issue Type: Improvement >Reporter: Adam Lamar >Assignee: Aljoscha Krettek >Priority: Major > Labels: pull-request-available > > In order to use any given non-AWS s3 implementation backed by the presto s3 > filesystem, it is necessary to set at least one configuration parameter in > flink-conf.yaml: > * presto.s3.endpoint: https://example.com > This appears to work as expected for hosted s3 alternatives. > In order to use a bring-your-own, self-hosted s3 alternative like > [minio|https://www.minio.io/], at least two configuration parameters are > required: > * presto.s3.endpoint: https://example.com > * presto.s3.path-style-access: true > However, the second path-style-access parameter doesn't work because the > 0.185 version of presto doesn't support passing through that configuration > option to the hive s3 client. > To work around the issue, path-style-access can be forced on the s3 client by > using an IP address for the endpoint (instead of a hostname). Without this > workaround, flink attempts to use the virtualhost-style at > bucketname.example.com, which fails unless the expected DNS records exist. > To solve this problem and enable non-IP endpoints, upgrade the > [pom|https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36] > to at least 0.186 which includes [this > commit|https://github.com/prestodb/presto/commit/0707f2f21a96d2fd30953fb3fa9a9a03f03d88bd] > Note that 0.213 is the latest presto release. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] adamlamar commented on issue #7058: [FLINK-10817] Upgrade presto dependency to support path-style access
adamlamar commented on issue #7058: [FLINK-10817] Upgrade presto dependency to support path-style access URL: https://github.com/apache/flink/pull/7058#issuecomment-446449939 @zentol @aljoscha With FLINK-10987 merged, is this ready for another review? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11037) Introduce another greedy mechanism for distributing floating buffers
[ https://issues.apache.org/jira/browse/FLINK-11037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718422#comment-16718422 ] zhijiang commented on FLINK-11037: -- [~StephanEwen], thanks for giving above proposals in point. :) * Network stack is very sensitive for performance. We should consider overall fairness as you mentioned for avoiding backpressure in some extent. * I think your proposal is a proper way to forward this feature, just like we give a flag to keep both credit and non-credit modes currently. * The adaptive algorithm sounds ideal but may behave unexpected in reality. So I agree with your point of forwarding simple determined way first, then we can forward step by step based on further experiments. Actually I implemented the extreme greedy algorithm as the first version in Alibaba for credit-based, and it experienced the challenges in double 11 and run for half an year in production. During contributing the whole feature to community, I changed to fair algorithm and refactored our private branch to keep same with community. In fact we have not compared the specific performances for these two algorithms in different scenarios, then it is hard to say which one is better. But I can think of one scenario to verify greedy algorithm better in theory. If the floating buffers in enough for all the input channels, that means no matter with fair or greedy algorithm, all the channels can always get all the required floating buffers from pool as a result. In this case the performance should be better in greedy way because we only need one request from pool to fetch all the floating buffers instead of looping many times to fetch one buffer each time, which would enter synchronized lock on pool side among different channels. > Introduce another greedy mechanism for distributing floating buffers > > > Key: FLINK-11037 > URL: https://issues.apache.org/jira/browse/FLINK-11037 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > The current mechanism for distributing floating buffers is fair for all the > listeners. In detail, each input channel can only request one floating buffer > each time although this channel may actually need more floating buffers. Then > this channel has to loop to request floating buffer until all are satisfied > or pool is exhausted. > In generally speaking, this way seems fair for all the concurrent channels > invoked by netty nio thread. But every request from LocalBufferPool needs to > syn lock and it is hard to say how to distribute all the available floating > buffers behaves better in real scenarios. > Therefore we propose another greedy mechanism to request more floating > buffers each time. In extreme case, we can even request all the required > buffers at a time or partial ones via configured parameters. On the other > side, LocalBufferPool can also decide how many floating buffers should been > assigned based on some factors, such as how many total channels and how many > total floating buffers. > The motivation is making better use of floating buffer resources and it may > need extra metrics for adjusting the mechanism dynamically. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11135) Reduce priority of the deprecated Hadoop specific Flink conf options
Paul Lin created FLINK-11135: Summary: Reduce priority of the deprecated Hadoop specific Flink conf options Key: FLINK-11135 URL: https://issues.apache.org/jira/browse/FLINK-11135 Project: Flink Issue Type: Improvement Affects Versions: 1.7.0, 1.6.2 Reporter: Paul Lin Assignee: Paul Lin In [FLINK-7967] we mark the Hadoop specific Flink configuration options as deprecated and recommend users to use HADOOP_CONF_DIR env variable instead, but if both are configured, Flink would still look for Hadoop configurations in the path specified in Flink configurations first. Plus, the code that searches Hadoop configurations in `fs.hdfs.hadoopconf` is wrongly placed in the env variables section. I think the search order should be: HADOOP_CONF_DIR > Flink conf options > HADOOP_HOME. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] KarmaGYZ opened a new pull request #7283: [hotfix][docs] Fix the mismatch generic type in hadoop compatibility doc
KarmaGYZ opened a new pull request #7283: [hotfix][docs] Fix the mismatch generic type in hadoop compatibility doc URL: https://github.com/apache/flink/pull/7283 ## What is the purpose of the change This pr fix the mismatch generic type in hadoop compatibility doc. ## Brief change log - Fix the mismatch generic type in section **Using Hadoop Mappers and Reducers** according to the context - Fix the mismatch generic type in section **Complete Hadoop WordCount Example** ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()
[ https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718417#comment-16718417 ] ASF GitHub Bot commented on FLINK-11010: samsai commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-446445016 @walterddr could you please check the timezone problem when dealing with eventtime? it seems that lamber-ken's commit only fixed proc time. Would you please refer to my previous comment to take a look at my eventtime usecase. thanks a lot! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink SQL timestamp is inconsistent with currentProcessingTime() > > > Key: FLINK-11010 > URL: https://issues.apache.org/jira/browse/FLINK-11010 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.2, 1.7.0, 1.8.0, 1.7.1 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > > Flink SQL timestamp is inconsistent with currentProcessingTime(). > > the ProcessingTime is just implemented by invoking System.currentTimeMillis() > but the long value will be automatically wrapped to a Timestamp with the > following statement: > `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] samsai commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()
samsai commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() URL: https://github.com/apache/flink/pull/7180#issuecomment-446445016 @walterddr could you please check the timezone problem when dealing with eventtime? it seems that lamber-ken's commit only fixed proc time. Would you please refer to my previous comment to take a look at my eventtime usecase. thanks a lot! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11134) Invalid REST API request should not log the full exception in Flink logs
Mark Cho created FLINK-11134: Summary: Invalid REST API request should not log the full exception in Flink logs Key: FLINK-11134 URL: https://issues.apache.org/jira/browse/FLINK-11134 Project: Flink Issue Type: Bug Affects Versions: 1.7.0 Reporter: Mark Cho {code:java} 2018-12-11 17:52:19,207 ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler - Exception occurred in REST handler. org.apache.flink.runtime.rest.NotFoundException: Job 15d06690e88d309aa1bdbb6ce7c6dcd1 not found at org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$1(AbstractExecutionGraphHandler.java:90) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache.lambda$getExecutionGraph$0(ExecutionGraphCache.java:133) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772) at akka.dispatch.OnComplete.internal(Future.scala:258) at akka.dispatch.OnComplete.internal(Future.scala:256) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (15d06690e88d309aa1bdbb6ce7c6dcd1) at org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGatewayFuture(Dispatcher.java:766) at org.apache.flink.runtime.dispatcher.Dispatcher.requestJob(Dispatcher.java:485) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at
[GitHub] KarmaGYZ opened a new pull request #7282: [hotfix][docs] imporve docs of batch overview
KarmaGYZ opened a new pull request #7282: [hotfix][docs] imporve docs of batch overview URL: https://github.com/apache/flink/pull/7282 ## What is the purpose of the change This pr is to improve the docs of Batch Overview, make the sample code of partitionCustom more readable. ## Brief change log - Improve the sample code of custom partitioning in java and scala ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11067) Port TableEnvironments to Java
[ https://issues.apache.org/jira/browse/FLINK-11067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718401#comment-16718401 ] Dian Fu commented on FLINK-11067: - Hi [~twalthr], Thanks a lot for the solution. Agree with you that we should expose interfaces instead of implementations. I have some quick questions and looking forward to your reply: 1. Regarding to the return type BatchTableEnvironment of the following API: {code:java} static BatchTableEnvironment getTableEnvironment(BatchEnvironment env);{code} What's the package name of BatchTableEnvironment? Seems that the backwards compatibility will always be broken no matter what it is. 2. The facade class for Batch/StreamTableEnvironment needs to consider for both Scala and Java or we will have separate facade classes for Scala and Java? > Port TableEnvironments to Java > -- > > Key: FLINK-11067 > URL: https://issues.apache.org/jira/browse/FLINK-11067 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Dawid Wysakowicz >Priority: Major > > This task includes porting {{TableEnvironment}}, {{StreamTableEnvironment}}, > {{BatchTableEnvironment}} to Java. API-breaking changes need to be avoided > and discussed. Some refactoring and clean up might be necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11133) FsCheckpointStorage is unaware about S3 entropy when creating directories
Mark Cho created FLINK-11133: Summary: FsCheckpointStorage is unaware about S3 entropy when creating directories Key: FLINK-11133 URL: https://issues.apache.org/jira/browse/FLINK-11133 Project: Flink Issue Type: Bug Affects Versions: 1.7.0 Reporter: Mark Cho We currently use S3 for our checkpoint storage with S3 entropy enabled. Entropy seems to be working correctly when writing out checkpoint metadata file (entropy key is correctly stripped from `state.checkpoints.dir`) and when writing out checkpoint data file (entropy key is correctly replaced with random string). However, from the logs, it seems like entropy key is not stripped or replaced when `FsCheckpointStorage` creates directories in the following class: [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java#L83-L85] Should FsCheckpointStorage skip initializing mkdir calls if object store like S3 is used since S3 doesn't have directory concept? If we want to keep the `mkdir` calls in `FsCheckpointStorage`, we should handle the entropy key specified in `state.checkpoints.dir`. Currently, folder markers in S3 are being created by Hadoop FileSystem with the entropy key in the path as a result of `mkdir` calls in `FsCheckpointStorage`). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718365#comment-16718365 ] ASF GitHub Bot commented on FLINK-10461: klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-446434187 @azagrebin Thank you for your explanation. As your explanation, I will - first move `DirectExecutorService` into `org.apache.flink.runtime.concurrent` and change the `Executor Executors#directExecutor()` to `DirectExecutorService Executors#direcExecutorService()`, this will be done in a separate commit. - then use `Executors#direcExecutorSerivice()` in current patch to share the logic of `threadNum = 1` and `threadNum > 1`. Did I understand right? If yes, I will file a issue and implement the first move, and then come back to complete this patch. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11081) Support binding port range for REST server
[ https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718372#comment-16718372 ] ASF GitHub Bot commented on FLINK-11081: yanghua commented on issue #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#issuecomment-446434964 hi @walterddr Thank you for your review. I responded to them and I think some suggestions make sense. In addition, rest.port will not be deprecated, after which it will only be used by the rest client, which makes its responsibilities clearer (it was originally used by both the server and the client), see the description of this issue. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support binding port range for REST server > -- > > Key: FLINK-11081 > URL: https://issues.apache.org/jira/browse/FLINK-11081 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.7.0, 1.8.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Currently the {{RestServerEndpoint}} binds to the port specified by > {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be > useful to being able to specify not only a single port but a port range to > pick a port from. Therefore, I propose to add similar to > {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which > allows to specify a port range for the {{RestServerEndpoint}} to pick a port > from. {{RestOptions#PORT}} would then only be used by the client to connect > to the started {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #7263: [FLINK-11081] Support binding port range for REST server
yanghua commented on issue #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#issuecomment-446434964 hi @walterddr Thank you for your review. I responded to them and I think some suggestions make sense. In addition, rest.port will not be deprecated, after which it will only be used by the rest client, which makes its responsibilities clearer (it was originally used by both the server and the client), see the description of this issue. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11081) Support binding port range for REST server
[ https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718363#comment-16718363 ] ASF GitHub Bot commented on FLINK-11081: yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240860330 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointConfigurationTest.java ## @@ -49,17 +77,154 @@ public void testBasicMapping() throws ConfigurationException { Configuration originalConfig = new Configuration(); originalConfig.setString(RestOptions.ADDRESS, ADDRESS); originalConfig.setString(RestOptions.BIND_ADDRESS, BIND_ADDRESS); - originalConfig.setInteger(RestOptions.PORT, PORT); + originalConfig.setString(RestOptions.BIND_PORT, BIND_PORT); originalConfig.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, CONTENT_LENGTH); originalConfig.setString(WebOptions.TMP_DIR, temporaryFolder.getRoot().getAbsolutePath()); final RestServerEndpointConfiguration result = RestServerEndpointConfiguration.fromConfiguration(originalConfig); Assert.assertEquals(ADDRESS, result.getRestAddress()); Assert.assertEquals(BIND_ADDRESS, result.getRestBindAddress()); - Assert.assertEquals(PORT, result.getRestBindPort()); + Assert.assertEquals(BIND_PORT, result.getRestBindPort()); Assert.assertEquals(CONTENT_LENGTH, result.getMaxContentLength()); Assert.assertThat( result.getUploadDir().toAbsolutePath().toString(), containsString(temporaryFolder.getRoot().getAbsolutePath())); } + Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support binding port range for REST server > -- > > Key: FLINK-11081 > URL: https://issues.apache.org/jira/browse/FLINK-11081 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.7.0, 1.8.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Currently the {{RestServerEndpoint}} binds to the port specified by > {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be > useful to being able to specify not only a single port but a port range to > pick a port from. Therefore, I propose to add similar to > {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which > allows to specify a port range for the {{RestServerEndpoint}} to pick a port > from. {{RestOptions#PORT}} would then only be used by the client to connect > to the started {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11081) Support binding port range for REST server
[ https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718361#comment-16718361 ] ASF GitHub Bot commented on FLINK-11081: yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240860279 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + restBindPort); + } + + int chosenPort = 0; + while (portsIterator.hasNext()) { + try { + chosenPort = portsIterator.next(); + if (restBindAddress == null) { + channel = bootstrap.bind(chosenPort); + } else { + channel = bootstrap.bind(restBindAddress, chosenPort); + } + serverChannel = channel.syncUninterruptibly().channel(); + break; + } catch (Exception e) { Review comment: Will add a test to let the first port conflict and choose the second. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support binding port range for REST server > -- > > Key: FLINK-11081 > URL: https://issues.apache.org/jira/browse/FLINK-11081 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.7.0, 1.8.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Currently the {{RestServerEndpoint}} binds to the port specified by > {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be > useful to being able to specify not only a single port but a port range to > pick a port from. Therefore, I propose to add similar to > {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which > allows to specify a port range for the {{RestServerEndpoint}} to pick a port > from. {{RestOptions#PORT}} would then only be used by the client to connect > to the started {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS
klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-446434187 @azagrebin Thank you for your explanation. As your explanation, I will - first move `DirectExecutorService` into `org.apache.flink.runtime.concurrent` and change the `Executor Executors#directExecutor()` to `DirectExecutorService Executors#direcExecutorService()`, this will be done in a separate commit. - then use `Executors#direcExecutorSerivice()` in current patch to share the logic of `threadNum = 1` and `threadNum > 1`. Did I understand right? If yes, I will file a issue and implement the first move, and then come back to complete this patch. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server
yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240860279 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + restBindPort); + } + + int chosenPort = 0; + while (portsIterator.hasNext()) { + try { + chosenPort = portsIterator.next(); + if (restBindAddress == null) { + channel = bootstrap.bind(chosenPort); + } else { + channel = bootstrap.bind(restBindAddress, chosenPort); + } + serverChannel = channel.syncUninterruptibly().channel(); + break; + } catch (Exception e) { Review comment: Will add a test to let the first port conflict and choose the second. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server
yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240860330 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointConfigurationTest.java ## @@ -49,17 +77,154 @@ public void testBasicMapping() throws ConfigurationException { Configuration originalConfig = new Configuration(); originalConfig.setString(RestOptions.ADDRESS, ADDRESS); originalConfig.setString(RestOptions.BIND_ADDRESS, BIND_ADDRESS); - originalConfig.setInteger(RestOptions.PORT, PORT); + originalConfig.setString(RestOptions.BIND_PORT, BIND_PORT); originalConfig.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, CONTENT_LENGTH); originalConfig.setString(WebOptions.TMP_DIR, temporaryFolder.getRoot().getAbsolutePath()); final RestServerEndpointConfiguration result = RestServerEndpointConfiguration.fromConfiguration(originalConfig); Assert.assertEquals(ADDRESS, result.getRestAddress()); Assert.assertEquals(BIND_ADDRESS, result.getRestBindAddress()); - Assert.assertEquals(PORT, result.getRestBindPort()); + Assert.assertEquals(BIND_PORT, result.getRestBindPort()); Assert.assertEquals(CONTENT_LENGTH, result.getMaxContentLength()); Assert.assertThat( result.getUploadDir().toAbsolutePath().toString(), containsString(temporaryFolder.getRoot().getAbsolutePath())); } + Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11081) Support binding port range for REST server
[ https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718357#comment-16718357 ] ASF GitHub Bot commented on FLINK-11081: yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240859949 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + restBindPort); + } + + int chosenPort = 0; + while (portsIterator.hasNext()) { + try { + chosenPort = portsIterator.next(); + if (restBindAddress == null) { + channel = bootstrap.bind(chosenPort); + } else { + channel = bootstrap.bind(restBindAddress, chosenPort); + } + serverChannel = channel.syncUninterruptibly().channel(); + break; + } catch (Exception e) { + // we can continue to try if this contains a netty channel exception + Throwable cause = e.getCause(); + if (!(cause instanceof org.jboss.netty.channel.ChannelException || + cause instanceof java.net.BindException)) { + throw e; Review comment: Here, we just retry when the exception is a port conflict specific exception. If not, it seems we should throw it and let the caller know there is another exception. Also, see `BootstrapTools#startActorSystem`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support binding port range for REST server > -- > > Key: FLINK-11081 > URL: https://issues.apache.org/jira/browse/FLINK-11081 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.7.0, 1.8.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Currently the {{RestServerEndpoint}} binds to the port specified by > {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be > useful to being able to specify not only a single port but a port range to > pick a port from. Therefore, I propose to add similar to > {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which > allows to specify a port range for the {{RestServerEndpoint}} to pick a port > from. {{RestOptions#PORT}} would then only be used by the client to connect > to the started {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server
yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240859949 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + restBindPort); + } + + int chosenPort = 0; + while (portsIterator.hasNext()) { + try { + chosenPort = portsIterator.next(); + if (restBindAddress == null) { + channel = bootstrap.bind(chosenPort); + } else { + channel = bootstrap.bind(restBindAddress, chosenPort); + } + serverChannel = channel.syncUninterruptibly().channel(); + break; + } catch (Exception e) { + // we can continue to try if this contains a netty channel exception + Throwable cause = e.getCause(); + if (!(cause instanceof org.jboss.netty.channel.ChannelException || + cause instanceof java.net.BindException)) { + throw e; Review comment: Here, we just retry when the exception is a port conflict specific exception. If not, it seems we should throw it and let the caller know there is another exception. Also, see `BootstrapTools#startActorSystem`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11083) CRowSerializerConfigSnapshot is not instantiable
[ https://issues.apache.org/jira/browse/FLINK-11083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718356#comment-16718356 ] ASF GitHub Bot commented on FLINK-11083: kisimple commented on issue #7267: [FLINK-11083][Table] CRowSerializerConfigSnapshot is not instantiable URL: https://github.com/apache/flink/pull/7267#issuecomment-446432902 cc @pnowojski @dawidwys This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > CRowSerializerConfigSnapshot is not instantiable > > > Key: FLINK-11083 > URL: https://issues.apache.org/jira/browse/FLINK-11083 > Project: Flink > Issue Type: Bug > Components: Table API SQL, Type Serialization System >Reporter: boshu Zheng >Assignee: boshu Zheng >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0, 1.7.1 > > > An exception was encountered when restarting a job with savepoint in our > production env, > {code:java} > 2018-12-04 20:28:25,091 INFO 10595 org.apache.flink.runtime.taskmanager.Task > :917 - _OurCustomOperator_ -> select: () -> to: Tuple2 -> > Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING > to FAILED. > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.util.FlinkException: Could not restore operator > state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) > from any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140) > ... 5 more > Caused by: java.lang.RuntimeException: The class > 'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot' > is not instantiable: The class has no (implicit) public nullary constructor, > i.e. a constructor without arguments. > at > org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412) > at > org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211) > at > org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211) > at > org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:218) > at > org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:105) > at >
[GitHub] kisimple commented on issue #7267: [FLINK-11083][Table] CRowSerializerConfigSnapshot is not instantiable
kisimple commented on issue #7267: [FLINK-11083][Table] CRowSerializerConfigSnapshot is not instantiable URL: https://github.com/apache/flink/pull/7267#issuecomment-446432902 cc @pnowojski @dawidwys This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11130) Migrate flink-table runtime triggers class
[ https://issues.apache.org/jira/browse/FLINK-11130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shenlei reassigned FLINK-11130: --- Assignee: shenlei > Migrate flink-table runtime triggers class > -- > > Key: FLINK-11130 > URL: https://issues.apache.org/jira/browse/FLINK-11130 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: shenlei >Assignee: shenlei >Priority: Major > > As discussed in Flink-11065 , this is a sub task of Flink-11065. This task > is to migrate flink-table runtime StateCleaningCountTrigger class to module > flink-table-runtime -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11081) Support binding port range for REST server
[ https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718350#comment-16718350 ] ASF GitHub Bot commented on FLINK-11081: yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240857598 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { Review comment: `getPortRangeFromString ` also throws `IllegalConfigurationException` which is a unchecked exception defined in `flink-core`, so here we catch a general `Exception` and in the catch block then rethrow a unified `IllegalArgumentException `. This code style used in `BootstrapTools#startActorSystem` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support binding port range for REST server > -- > > Key: FLINK-11081 > URL: https://issues.apache.org/jira/browse/FLINK-11081 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.7.0, 1.8.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Currently the {{RestServerEndpoint}} binds to the port specified by > {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be > useful to being able to specify not only a single port but a port range to > pick a port from. Therefore, I propose to add similar to > {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which > allows to specify a port range for the {{RestServerEndpoint}} to pick a port > from. {{RestOptions#PORT}} would then only be used by the client to connect > to the started {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server
yanghua commented on a change in pull request #7263: [FLINK-11081] Support binding port range for REST server URL: https://github.com/apache/flink/pull/7263#discussion_r240857598 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ## @@ -181,14 +184,44 @@ protected void initChannel(SocketChannel ch) { .channel(NioServerSocketChannel.class) .childHandler(initializer); - log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); - final ChannelFuture channel; - if (restBindAddress == null) { - channel = bootstrap.bind(restBindPort); - } else { - channel = bootstrap.bind(restBindAddress, restBindPort); + ChannelFuture channel; + + // parse port range definition and create port iterator + Iterator portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(restBindPort); + } catch (Exception e) { Review comment: `getPortRangeFromString ` also throws `IllegalConfigurationException` which is a unchecked exception defined in `flink-core`, so here we catch a general `Exception` and in the catch block then rethrow a unified `IllegalArgumentException `. This code style used in `BootstrapTools#startActorSystem` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10566) Flink Planning is exponential in the number of stages
[ https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718339#comment-16718339 ] ASF GitHub Bot commented on FLINK-10566: fhueske commented on a change in pull request #7276: [FLINK-10566] Fix exponential planning time of large programs URL: https://github.com/apache/flink/pull/7276#discussion_r240855467 ## File path: flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.planning; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.client.program.OptimizerPlanEnvironment; +import org.apache.flink.client.program.PreviewPlanEnvironment; +import org.apache.flink.configuration.Configuration; + +import org.junit.Test; + +import java.util.Collection; + +/** + * Tests that large programs can be compiled to a Plan in reasonable amount of time. + */ +public class LargePlanTest { + + @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 15_000) + public void testPlanningOfLargePlan() throws Exception { + runProgram(new PreviewPlanEnvironment(), 10, 50); + } + + private static void runProgram(ExecutionEnvironment env, int depth, int width) throws Exception { + DataSet input = env.fromElements("a", "b", "c"); + DataSet stats = null; + + for (int i = 0; i < depth; i++) { + stats = analyze(input, stats, width / (i + 1) + 1); + } + + stats.output(new DiscardingOutputFormat<>()); + env.execute("depth " + depth + " width " + width); + } + + private static DataSet analyze(DataSet input, DataSet stats, int branches) { + for (int i = 0; i < branches; i++) { + final int ii = i; + + if (stats != null) { + input = input.map(new RichMapFunction() { + + @Override + public void open(Configuration parameters) throws Exception { + Collection broadcastSet = getRuntimeContext().getBroadcastVariable("stats"); + } + + @Override + public String map(String value) throws Exception { + return value; + } + }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats"); + } + + TupleTypeInfo> typeInfo = + new TupleTypeInfo<>(TypeInformation.of(Integer.class), TypeInformation.of(String.class)); + DataSet branch = input + .map(s -> new Tuple2<>(0, s + ii)).returns(typeInfo) + .groupBy(0) + .minBy(1) + .map(kv -> kv.f1).returns(TypeInformation.of(String.class)); Review comment: `returns(Types.STRING)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink Planning is exponential in the number of stages > - > > Key: FLINK-10566 > URL: https://issues.apache.org/jira/browse/FLINK-10566 > Project: Flink > Issue Type: Bug >
[GitHub] fhueske commented on a change in pull request #7276: [FLINK-10566] Fix exponential planning time of large programs
fhueske commented on a change in pull request #7276: [FLINK-10566] Fix exponential planning time of large programs URL: https://github.com/apache/flink/pull/7276#discussion_r240855219 ## File path: flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.planning; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.client.program.OptimizerPlanEnvironment; +import org.apache.flink.client.program.PreviewPlanEnvironment; +import org.apache.flink.configuration.Configuration; + +import org.junit.Test; + +import java.util.Collection; + +/** + * Tests that large programs can be compiled to a Plan in reasonable amount of time. + */ +public class LargePlanTest { + + @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 15_000) + public void testPlanningOfLargePlan() throws Exception { + runProgram(new PreviewPlanEnvironment(), 10, 50); + } + + private static void runProgram(ExecutionEnvironment env, int depth, int width) throws Exception { + DataSet input = env.fromElements("a", "b", "c"); + DataSet stats = null; + + for (int i = 0; i < depth; i++) { + stats = analyze(input, stats, width / (i + 1) + 1); Review comment: indent This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on a change in pull request #7276: [FLINK-10566] Fix exponential planning time of large programs
fhueske commented on a change in pull request #7276: [FLINK-10566] Fix exponential planning time of large programs URL: https://github.com/apache/flink/pull/7276#discussion_r240855467 ## File path: flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.planning; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.client.program.OptimizerPlanEnvironment; +import org.apache.flink.client.program.PreviewPlanEnvironment; +import org.apache.flink.configuration.Configuration; + +import org.junit.Test; + +import java.util.Collection; + +/** + * Tests that large programs can be compiled to a Plan in reasonable amount of time. + */ +public class LargePlanTest { + + @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 15_000) + public void testPlanningOfLargePlan() throws Exception { + runProgram(new PreviewPlanEnvironment(), 10, 50); + } + + private static void runProgram(ExecutionEnvironment env, int depth, int width) throws Exception { + DataSet input = env.fromElements("a", "b", "c"); + DataSet stats = null; + + for (int i = 0; i < depth; i++) { + stats = analyze(input, stats, width / (i + 1) + 1); + } + + stats.output(new DiscardingOutputFormat<>()); + env.execute("depth " + depth + " width " + width); + } + + private static DataSet analyze(DataSet input, DataSet stats, int branches) { + for (int i = 0; i < branches; i++) { + final int ii = i; + + if (stats != null) { + input = input.map(new RichMapFunction() { + + @Override + public void open(Configuration parameters) throws Exception { + Collection broadcastSet = getRuntimeContext().getBroadcastVariable("stats"); + } + + @Override + public String map(String value) throws Exception { + return value; + } + }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats"); + } + + TupleTypeInfo> typeInfo = + new TupleTypeInfo<>(TypeInformation.of(Integer.class), TypeInformation.of(String.class)); + DataSet branch = input + .map(s -> new Tuple2<>(0, s + ii)).returns(typeInfo) + .groupBy(0) + .minBy(1) + .map(kv -> kv.f1).returns(TypeInformation.of(String.class)); Review comment: `returns(Types.STRING)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10566) Flink Planning is exponential in the number of stages
[ https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718338#comment-16718338 ] ASF GitHub Bot commented on FLINK-10566: fhueske commented on a change in pull request #7276: [FLINK-10566] Fix exponential planning time of large programs URL: https://github.com/apache/flink/pull/7276#discussion_r240855219 ## File path: flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.planning; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.client.program.OptimizerPlanEnvironment; +import org.apache.flink.client.program.PreviewPlanEnvironment; +import org.apache.flink.configuration.Configuration; + +import org.junit.Test; + +import java.util.Collection; + +/** + * Tests that large programs can be compiled to a Plan in reasonable amount of time. + */ +public class LargePlanTest { + + @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 15_000) + public void testPlanningOfLargePlan() throws Exception { + runProgram(new PreviewPlanEnvironment(), 10, 50); + } + + private static void runProgram(ExecutionEnvironment env, int depth, int width) throws Exception { + DataSet input = env.fromElements("a", "b", "c"); + DataSet stats = null; + + for (int i = 0; i < depth; i++) { + stats = analyze(input, stats, width / (i + 1) + 1); Review comment: indent This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink Planning is exponential in the number of stages > - > > Key: FLINK-10566 > URL: https://issues.apache.org/jira/browse/FLINK-10566 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 1.5.4, 1.6.1, 1.7.0 >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Attachments: chart.png > > > This makes it nearly impossible to run graphs with 100 or more stages. (The > execution itself is still sub-second, but the job submission takes > increasingly long.) > I can reproduce this with the following pipeline, which resembles my > real-world workloads (with depth up to 10 and width up, and past, 50). On > Flink it seems getting width beyond width 10 is problematic (times out after > hours). Note the log scale on the chart for time. > > {code:java} > public static void runPipeline(int depth, int width) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet input = env.fromElements("a", "b", "c"); > DataSet stats = null; > for (int i = 0; i < depth; i++) { > stats = analyze(input, stats, width / (i + 1) + 1); > } > stats.writeAsText("out.txt"); > env.execute("depth " + depth + " width " + width); > } > public static DataSet analyze(DataSet input, > DataSet stats, int branches) { > System.out.println("analyze " + branches); > for (int i = 0; i < branches; i++) { > final int ii = i; > if (stats != null) { > input = input.map(new RichMapFunction() { > @Override > public void open(Configuration parameters) throws Exception { > Collection
[jira] [Commented] (FLINK-10566) Flink Planning is exponential in the number of stages
[ https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718336#comment-16718336 ] ASF GitHub Bot commented on FLINK-10566: fhueske commented on a change in pull request #7276: [FLINK-10566] Fix exponential planning time of large programs URL: https://github.com/apache/flink/pull/7276#discussion_r240855139 ## File path: flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.planning; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.client.program.OptimizerPlanEnvironment; +import org.apache.flink.client.program.PreviewPlanEnvironment; +import org.apache.flink.configuration.Configuration; + +import org.junit.Test; + +import java.util.Collection; + +/** + * Tests that large programs can be compiled to a Plan in reasonable amount of time. + */ +public class LargePlanTest { + + @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 15_000) + public void testPlanningOfLargePlan() throws Exception { + runProgram(new PreviewPlanEnvironment(), 10, 50); + } + + private static void runProgram(ExecutionEnvironment env, int depth, int width) throws Exception { + DataSet input = env.fromElements("a", "b", "c"); + DataSet stats = null; + + for (int i = 0; i < depth; i++) { + stats = analyze(input, stats, width / (i + 1) + 1); + } + + stats.output(new DiscardingOutputFormat<>()); + env.execute("depth " + depth + " width " + width); + } + + private static DataSet analyze(DataSet input, DataSet stats, int branches) { + for (int i = 0; i < branches; i++) { + final int ii = i; + + if (stats != null) { + input = input.map(new RichMapFunction() { + + @Override + public void open(Configuration parameters) throws Exception { + Collection broadcastSet = getRuntimeContext().getBroadcastVariable("stats"); + } + + @Override + public String map(String value) throws Exception { + return value; + } + }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats"); + } + + TupleTypeInfo> typeInfo = + new TupleTypeInfo<>(TypeInformation.of(Integer.class), TypeInformation.of(String.class)); + DataSet branch = input + .map(s -> new Tuple2<>(0, s + ii)).returns(typeInfo) Review comment: can be simplified to `.returns(Types.TUPLE(Types.STRING, Types.INT))` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink Planning is exponential in the number of stages > - > > Key: FLINK-10566 > URL: https://issues.apache.org/jira/browse/FLINK-10566 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 1.5.4, 1.6.1, 1.7.0 >Reporter: Robert Bradshaw >Assignee:
[GitHub] fhueske commented on a change in pull request #7276: [FLINK-10566] Fix exponential planning time of large programs
fhueske commented on a change in pull request #7276: [FLINK-10566] Fix exponential planning time of large programs URL: https://github.com/apache/flink/pull/7276#discussion_r240855139 ## File path: flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.planning; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.client.program.OptimizerPlanEnvironment; +import org.apache.flink.client.program.PreviewPlanEnvironment; +import org.apache.flink.configuration.Configuration; + +import org.junit.Test; + +import java.util.Collection; + +/** + * Tests that large programs can be compiled to a Plan in reasonable amount of time. + */ +public class LargePlanTest { + + @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 15_000) + public void testPlanningOfLargePlan() throws Exception { + runProgram(new PreviewPlanEnvironment(), 10, 50); + } + + private static void runProgram(ExecutionEnvironment env, int depth, int width) throws Exception { + DataSet input = env.fromElements("a", "b", "c"); + DataSet stats = null; + + for (int i = 0; i < depth; i++) { + stats = analyze(input, stats, width / (i + 1) + 1); + } + + stats.output(new DiscardingOutputFormat<>()); + env.execute("depth " + depth + " width " + width); + } + + private static DataSet analyze(DataSet input, DataSet stats, int branches) { + for (int i = 0; i < branches; i++) { + final int ii = i; + + if (stats != null) { + input = input.map(new RichMapFunction() { + + @Override + public void open(Configuration parameters) throws Exception { + Collection broadcastSet = getRuntimeContext().getBroadcastVariable("stats"); + } + + @Override + public String map(String value) throws Exception { + return value; + } + }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats"); + } + + TupleTypeInfo> typeInfo = + new TupleTypeInfo<>(TypeInformation.of(Integer.class), TypeInformation.of(String.class)); + DataSet branch = input + .map(s -> new Tuple2<>(0, s + ii)).returns(typeInfo) Review comment: can be simplified to `.returns(Types.TUPLE(Types.STRING, Types.INT))` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services