[GitHub] flink pull request #4704: [hotfix] [docs] Update function name in DataStream...
GitHub user phiradet opened a pull request: https://github.com/apache/flink/pull/4704 [hotfix] [docs] Update function name in DataStream API Currently, the `javaStream` should be used to get the underlying java DataStream object -- not `getJavaStream` You can merge this pull request into a Git repository by running: $ git pull https://github.com/phiradet/flink hotfix-doc-datastream_api Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4704.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4704 commit b7f9544fe65adbab88ddc2869f8d6a523291e938 Author: Bang, Phiradet Date: 2017-09-22T06:35:37Z To get the underlying java DataStream object, the javaStream should be used, not getJavaStream ---
[jira] [Commented] (FLINK-7388) ProcessFunction.onTimer() sets processing time as timestamp
[ https://issues.apache.org/jira/browse/FLINK-7388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16176009#comment-16176009 ] Bowen Li commented on FLINK-7388: - Interesting one. I just got some experience on ProcessFunction and would like to re-enforce my understanding of it > ProcessFunction.onTimer() sets processing time as timestamp > --- > > Key: FLINK-7388 > URL: https://issues.apache.org/jira/browse/FLINK-7388 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Bowen Li > > The {{ProcessFunction.onTimer()}} method sets the current processing time as > event-time timestamp when it is called from a processing time timer. > I don't think this behavior is useful. Processing time timestamps won't be > aligned with watermarks and are not deterministic. The only reason would be > to have _some_ value in the timestamp field. However, the behavior is very > subtle and might not be noticed by users. > IMO, it would be better to erase the timestamp. This will cause downstream > operator that rely on timestamps to fail and notify the users that the logic > they implemented was probably not what they intended to do. > What do you think [~aljoscha]? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7388) ProcessFunction.onTimer() sets processing time as timestamp
[ https://issues.apache.org/jira/browse/FLINK-7388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-7388: --- Assignee: Bowen Li > ProcessFunction.onTimer() sets processing time as timestamp > --- > > Key: FLINK-7388 > URL: https://issues.apache.org/jira/browse/FLINK-7388 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske >Assignee: Bowen Li > > The {{ProcessFunction.onTimer()}} method sets the current processing time as > event-time timestamp when it is called from a processing time timer. > I don't think this behavior is useful. Processing time timestamps won't be > aligned with watermarks and are not deterministic. The only reason would be > to have _some_ value in the timestamp field. However, the behavior is very > subtle and might not be noticed by users. > IMO, it would be better to erase the timestamp. This will cause downstream > operator that rely on timestamps to fail and notify the users that the logic > they implemented was probably not what they intended to do. > What do you think [~aljoscha]? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7433) Lack of synchronization accessing inProgress in JobCancellationWithSavepointHandlers
[ https://issues.apache.org/jira/browse/FLINK-7433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175981#comment-16175981 ] Bowen Li commented on FLINK-7433: - [~tedyu] Hi Ted, this seems to be a dup of FLINK-7659. I've submitted a PR for FLINK-7659, and will close this one > Lack of synchronization accessing inProgress in > JobCancellationWithSavepointHandlers > > > Key: FLINK-7433 > URL: https://issues.apache.org/jira/browse/FLINK-7433 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Priority: Minor > > {code} > try { > if (throwable != null) { > completed.put(requestId, throwable); > } else { > completed.put(requestId, path); > } > } finally { > inProgress.remove(jobId); > } > {code} > The call to inProgress.remove(jobId) should be protected by lock object. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7433) Lack of synchronization accessing inProgress in JobCancellationWithSavepointHandlers
[ https://issues.apache.org/jira/browse/FLINK-7433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li closed FLINK-7433. --- Resolution: Duplicate > Lack of synchronization accessing inProgress in > JobCancellationWithSavepointHandlers > > > Key: FLINK-7433 > URL: https://issues.apache.org/jira/browse/FLINK-7433 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Priority: Minor > > {code} > try { > if (throwable != null) { > completed.put(requestId, throwable); > } else { > completed.put(requestId, path); > } > } finally { > inProgress.remove(jobId); > } > {code} > The call to inProgress.remove(jobId) should be protected by lock object. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7592) How to close the output log like "Connected to JobManager at Actor" and "Job execution switched to status RUNNING."?
[ https://issues.apache.org/jira/browse/FLINK-7592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175977#comment-16175977 ] Bowen Li commented on FLINK-7592: - Hi, are you requesting a feature or asking this specific question? If it's a feature request, can you please re-phrase its description? If you are asking questions, please use Flink's mailing list. JIRA is for keeping track of tasks, features, issues, and bugs. > How to close the output log like "Connected to JobManager at Actor" and "Job > execution switched to status RUNNING."? > > > Key: FLINK-7592 > URL: https://issues.apache.org/jira/browse/FLINK-7592 > Project: Flink > Issue Type: Wish > Components: Configuration >Affects Versions: 1.3.2 > Environment: jdk 1.8 >Reporter: brighteast > Labels: features > Fix For: 1.3.2 > > Original Estimate: 48h > Remaining Estimate: 48h > > Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1060959513] > with leader session id 59ff044a-ab2b-4518-b7b8-8352da361511. > 09/06/2017 15:06:35 Job execution switched to status RUNNING. > 09/06/2017 15:06:35 CHAIN DataSource (at > org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:601) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at > flinkTest$.delayedEndpoint$flinkTest$1(flinkTest.scala:10)) -> Map (Map at > flinkTest$.delayedEndpoint$flinkTest$1(flinkTest.scala:11)) -> > Combine(SUM(1))(1/1) switched to SCHEDULED > 09/06/2017 15:06:35 CHAIN DataSource (at > org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:601) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at > flinkTest$.delayedEndpoint$flinkTest$1(flinkTest.scala:10)) -> Map (Map at > flinkTest$.delayedEndpoint$flinkTest$1(flinkTest.scala:11)) -> > Combine(SUM(1))(1/1) switched to DEPLOYING > 09/06/2017 15:06:35 CHAIN DataSource (at > org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:601) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at > flinkTest$.delayedEndpoint$flinkTest$1(flinkTest.scala:10)) -> Map (Map at > flinkTest$.delayedEndpoint$flinkTest$1(flinkTest.scala:11)) -> > Combine(SUM(1))(1/1) switched to RUNNING > 09/06/2017 15:06:35 Reduce (SUM(1))(2/8) switched to SCHEDULED > 09/06/2017 15:06:35 Reduce (SUM(1))(1/8) switched to SCHEDULED > 09/06/2017 15:06:35 Reduce (SUM(1))(3/8) switched to SCHEDULED > 09/06/2017 15:06:35 Reduce (SUM(1))(4/8) switched to SCHEDULED > 09/06/2017 15:06:35 Reduce (SUM(1))(5/8) switched to SCHEDULED > 09/06/2017 15:06:35 Reduce (SUM(1))(6/8) switched to SCHEDULED > 09/06/2017 15:06:35 Reduce (SUM(1))(7/8) switched to SCHEDULED > 09/06/2017 15:06:35 Reduce (SUM(1))(8/8) switched to SCHEDULED > 09/06/2017 15:06:35 Reduce (SUM(1))(4/8) switched to DEPLOYING > 09/06/2017 15:06:35 Reduce (SUM(1))(8/8) switched to DEPLOYING > 09/06/2017 15:06:35 Reduce (SUM(1))(7/8) switched to DEPLOYING > 09/06/2017 15:06:35 Reduce (SUM(1))(6/8) switched to DEPLOYING > 09/06/2017 15:06:35 Reduce (SUM(1))(3/8) switched to DEPLOYING > 09/06/2017 15:06:35 Reduce (SUM(1))(1/8) switched to DEPLOYING > 09/06/2017 15:06:35 Reduce (SUM(1))(5/8) switched to DEPLOYING > 09/06/2017 15:06:35 CHAIN DataSource (at > org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:601) > (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at > flinkTest$.delayedEndpoint$flinkTest$1(flinkTest.scala:10)) -> Map (Map at > flinkTest$.delayedEndpoint$flinkTest$1(flinkTest.scala:11)) -> > Combine(SUM(1))(1/1) switched to FINISHED > 09/06/2017 15:06:35 Reduce (SUM(1))(2/8) switched to DEPLOYING > 09/06/2017 15:06:35 Reduce (SUM(1))(5/8) switched to RUNNING > 09/06/2017 15:06:35 Reduce (SUM(1))(8/8) switched to RUNNING > 09/06/2017 15:06:35 Reduce (SUM(1))(7/8) switched to RUNNING > 09/06/2017 15:06:35 Reduce (SUM(1))(2/8) switched to RUNNING > 09/06/2017 15:06:35 Reduce (SUM(1))(3/8) switched to RUNNING > 09/06/2017 15:06:35 Reduce (SUM(1))(6/8) switched to RUNNING > 09/06/2017 15:06:35 Reduce (SUM(1))(4/8) switched to RUNNING > 09/06/2017 15:06:35 Reduce (SUM(1))(1/8) switched to RUNNING > 09/06/2017 15:06:35 DataSink (collect())(1/8) switched to SCHEDULED > 09/06/2017 15:06:35 DataSink (collect())(8/8) switched to SCHEDULED > 09/06/2017 15:06:35 DataSink (collect())(8/8) switched to DEPLOYING > 09/06/2017 15:06:35 DataSink (collect())(1/8) switched to DEPLOYING > 09/06/2017 15:06:35 DataSink (collect())(4/8) switched to SCHEDULED
[jira] [Commented] (FLINK-5726) Add the RocketMQ plugin for the Apache Flink
[ https://issues.apache.org/jira/browse/FLINK-5726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175963#comment-16175963 ] Bowen Li commented on FLINK-5726: - [~longda] Hi Longda, I'm curious that is Alibaba's BLINK using RocketMQ or Kafka? > Add the RocketMQ plugin for the Apache Flink > > > Key: FLINK-5726 > URL: https://issues.apache.org/jira/browse/FLINK-5726 > Project: Flink > Issue Type: Task > Components: Streaming Connectors >Reporter: Longda Feng >Priority: Minor > > Apache RocketMQ® is an open source distributed messaging and streaming data > platform. It has been used in a lot of companies. Please refer to > http://rocketmq.incubator.apache.org/ for more details. > Since the Apache RocketMq 4.0 will be released in the next few days, we can > start the job of adding the RocketMq plugin for the Apache Flink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175953#comment-16175953 ] ASF GitHub Bot commented on FLINK-7378: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4485 @NicoK , I missed that message of `verifyAllBuffersReturned()` issue before. I have submitted the modifications of it. :) > Create a fix size (non rebalancing) buffer pool type for the floating buffers > - > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}} directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4485 @NicoK , I missed that message of `verifyAllBuffersReturned()` issue before. I have submitted the modifications of it. :) ---
[GitHub] flink pull request #4703: [FLINK-7659][REST] Unprotected access to inProgres...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/4703 [FLINK-7659][REST] Unprotected access to inProgress in JobCancellationWithSavepointHandlers#handleNewRequest ## What is the purpose of the change There's a thread accessing `inProgress` without proper synchronization ## Brief change log Add synchronization to access `inProgress` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-7659 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4703.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4703 commit 6b7904bcfe809542216e12e4b48933fcb47d2d1d Author: Bowen Li Date: 2017-09-22T05:33:17Z FLINK-7659 Unprotected access to inProgress in JobCancellationWithSavepointHandlers#handleNewRequest ---
[jira] [Commented] (FLINK-7659) Unprotected access to inProgress in JobCancellationWithSavepointHandlers#handleNewRequest
[ https://issues.apache.org/jira/browse/FLINK-7659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175942#comment-16175942 ] ASF GitHub Bot commented on FLINK-7659: --- GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/4703 [FLINK-7659][REST] Unprotected access to inProgress in JobCancellationWithSavepointHandlers#handleNewRequest ## What is the purpose of the change There's a thread accessing `inProgress` without proper synchronization ## Brief change log Add synchronization to access `inProgress` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-7659 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4703.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4703 commit 6b7904bcfe809542216e12e4b48933fcb47d2d1d Author: Bowen Li Date: 2017-09-22T05:33:17Z FLINK-7659 Unprotected access to inProgress in JobCancellationWithSavepointHandlers#handleNewRequest > Unprotected access to inProgress in > JobCancellationWithSavepointHandlers#handleNewRequest > - > > Key: FLINK-7659 > URL: https://issues.apache.org/jira/browse/FLINK-7659 > Project: Flink > Issue Type: Bug > Components: REST >Reporter: Ted Yu >Assignee: Bowen Li > > Here is related code: > {code} > } finally { > inProgress.remove(jobId); > } > {code} > A little lower, in another finally block, there is: > {code} > synchronized (lock) { > if (!success) { > inProgress.remove(jobId); > {code} > which is correct. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7659) Unprotected access to inProgress in JobCancellationWithSavepointHandlers#handleNewRequest
[ https://issues.apache.org/jira/browse/FLINK-7659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-7659: Component/s: REST > Unprotected access to inProgress in > JobCancellationWithSavepointHandlers#handleNewRequest > - > > Key: FLINK-7659 > URL: https://issues.apache.org/jira/browse/FLINK-7659 > Project: Flink > Issue Type: Bug > Components: REST >Reporter: Ted Yu >Assignee: Bowen Li > > Here is related code: > {code} > } finally { > inProgress.remove(jobId); > } > {code} > A little lower, in another finally block, there is: > {code} > synchronized (lock) { > if (!success) { > inProgress.remove(jobId); > {code} > which is correct. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7659) Unprotected access to inProgress in JobCancellationWithSavepointHandlers#handleNewRequest
[ https://issues.apache.org/jira/browse/FLINK-7659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-7659: --- Assignee: Bowen Li > Unprotected access to inProgress in > JobCancellationWithSavepointHandlers#handleNewRequest > - > > Key: FLINK-7659 > URL: https://issues.apache.org/jira/browse/FLINK-7659 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Bowen Li > > Here is related code: > {code} > } finally { > inProgress.remove(jobId); > } > {code} > A little lower, in another finally block, there is: > {code} > synchronized (lock) { > if (!success) { > inProgress.remove(jobId); > {code} > which is correct. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7635) Support sideOutput in ProcessWindowFunciton
[ https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175888#comment-16175888 ] ASF GitHub Bot commented on FLINK-7635: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4702 @aljoscha @chenqin Hi guys, can you please take a look? I'll write doc about this feature with [FLINK-7660](https://issues.apache.org/jira/browse/FLINK-7660). BTW, this is my first time writing Scala > Support sideOutput in ProcessWindowFunciton > --- > > Key: FLINK-7635 > URL: https://issues.apache.org/jira/browse/FLINK-7635 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Scala API >Reporter: Chen Qin >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only > implemented output to ProcessFunction Context. It would be nice to add > support to ProcessWindow and ProcessAllWindow functions as well. [email > threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html] > [~aljoscha] I thought this is good warm up task for ppl to learn how window > function works in general. Otherwise feel free to assign back to me. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4702: [FLINK-7635][DataStream API][Scala API] Support sideOutpu...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4702 @aljoscha @chenqin Hi guys, can you please take a look? I'll write doc about this feature with [FLINK-7660](https://issues.apache.org/jira/browse/FLINK-7660). BTW, this is my first time writing Scala ---
[jira] [Commented] (FLINK-7635) Support sideOutput in ProcessWindowFunciton
[ https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175800#comment-16175800 ] ASF GitHub Bot commented on FLINK-7635: --- GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/4702 [FLINK-7635][DataStream API][Scala API] Support sideOutput in ProcessWindowFunciton ## What is the purpose of the change Add support for sideOutput in ProcessWindowFunciton ## Verifying this change This change added tests and can be verified as follows: - Added integration tests for DataStream API in `flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java` - Added integration tests for Scala API in `flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala` ## Does this pull request potentially affect one of the following parts: - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (I will add document for this feature and [FLINK-7660](https://issues.apache.org/jira/browse/FLINK-7660) together when implementing FLINK-7660) You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-7635 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4702.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4702 commit 6386983239bd3024b395c865ec4fd33e232ca5a3 Author: Bowen Li Date: 2017-08-30T16:35:03Z FLINK-7422 Upgrade Kinesis Client Library (KCL) and AWS SDK in flink-connector-kinesis commit 381cd4156b84673a1d32d2db3f7b2d748d90d980 Author: Bowen Li Date: 2017-09-07T06:33:37Z Merge remote-tracking branch 'upstream/master' commit dcf40bd821187b848d924f7f4df6805b1b924c16 Author: Bowen Li Date: 2017-09-15T18:00:03Z Merge remote-tracking branch 'upstream/master' commit 169ea0a3bee1ba315d39fa49c16e9bd7c71d1338 Author: Bowen Li Date: 2017-09-18T06:25:26Z Merge remote-tracking branch 'upstream/master' commit 659e91c18ade8eb65d355b5b85ae2d402a61ff5e Author: Bowen Li Date: 2017-09-18T23:50:48Z Merge remote-tracking branch 'upstream/master' commit 990c4648a1427ca7c3c27453fe2a40cd5cac3734 Author: Bowen Li Date: 2017-09-19T17:18:54Z Merge remote-tracking branch 'upstream/master' commit fb88d61579cbc602b21f96ffca5c189aa846bec1 Author: Bowen Li Date: 2017-09-20T05:56:14Z Merge branch 'master' of github.com:bowenli86/flink commit 3538421e6e8d194fe838f662480f44b57a41e5e2 Author: Bowen Li Date: 2017-09-20T06:35:34Z add full path of WindowWordCount to doc to remove warning from IDE commit e08f099465f495401efad1a4d8f54a19735df76c Author: Bowen Li Date: 2017-09-20T18:08:06Z format code and doc commit 83fca0651f8d342aea83291bfe4d45bc8df646d2 Author: Bowen Li Date: 2017-09-21T19:13:17Z update window doc commit a90e18102e8f85d305cd5f2f077db202f471023b Author: Bowen Li Date: 2017-09-22T00:56:03Z FLINK-7635 Support sideOutput in ProcessWindowFunciton commit 76e22afa3030db9dd555608168f7cf408696d1a7 Author: Bowen Li Date: 2017-09-22T01:07:02Z add output to InternalProcessApplyWindowContext commit 3da7ce9c439fb58c898f34acc87e11621df68947 Author: Bowen Li Date: 2017-09-22T01:57:03Z add scala API > Support sideOutput in ProcessWindowFunciton > --- > > Key: FLINK-7635 > URL: https://issues.apache.org/jira/browse/FLINK-7635 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Scala API >Reporter: Chen Qin >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only > implemented output to ProcessFunction Context. It would be nice to add > support to ProcessWindow and ProcessAllWindow functions as well. [email > threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html] > [~aljoscha] I thought this is good warm up task for ppl to learn how window > function works in general. Otherwise feel free to assign back to me. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4702: [FLINK-7635][DataStream API][Scala API] Support si...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/4702 [FLINK-7635][DataStream API][Scala API] Support sideOutput in ProcessWindowFunciton ## What is the purpose of the change Add support for sideOutput in ProcessWindowFunciton ## Verifying this change This change added tests and can be verified as follows: - Added integration tests for DataStream API in `flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java` - Added integration tests for Scala API in `flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala` ## Does this pull request potentially affect one of the following parts: - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (I will add document for this feature and [FLINK-7660](https://issues.apache.org/jira/browse/FLINK-7660) together when implementing FLINK-7660) You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-7635 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4702.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4702 commit 6386983239bd3024b395c865ec4fd33e232ca5a3 Author: Bowen Li Date: 2017-08-30T16:35:03Z FLINK-7422 Upgrade Kinesis Client Library (KCL) and AWS SDK in flink-connector-kinesis commit 381cd4156b84673a1d32d2db3f7b2d748d90d980 Author: Bowen Li Date: 2017-09-07T06:33:37Z Merge remote-tracking branch 'upstream/master' commit dcf40bd821187b848d924f7f4df6805b1b924c16 Author: Bowen Li Date: 2017-09-15T18:00:03Z Merge remote-tracking branch 'upstream/master' commit 169ea0a3bee1ba315d39fa49c16e9bd7c71d1338 Author: Bowen Li Date: 2017-09-18T06:25:26Z Merge remote-tracking branch 'upstream/master' commit 659e91c18ade8eb65d355b5b85ae2d402a61ff5e Author: Bowen Li Date: 2017-09-18T23:50:48Z Merge remote-tracking branch 'upstream/master' commit 990c4648a1427ca7c3c27453fe2a40cd5cac3734 Author: Bowen Li Date: 2017-09-19T17:18:54Z Merge remote-tracking branch 'upstream/master' commit fb88d61579cbc602b21f96ffca5c189aa846bec1 Author: Bowen Li Date: 2017-09-20T05:56:14Z Merge branch 'master' of github.com:bowenli86/flink commit 3538421e6e8d194fe838f662480f44b57a41e5e2 Author: Bowen Li Date: 2017-09-20T06:35:34Z add full path of WindowWordCount to doc to remove warning from IDE commit e08f099465f495401efad1a4d8f54a19735df76c Author: Bowen Li Date: 2017-09-20T18:08:06Z format code and doc commit 83fca0651f8d342aea83291bfe4d45bc8df646d2 Author: Bowen Li Date: 2017-09-21T19:13:17Z update window doc commit a90e18102e8f85d305cd5f2f077db202f471023b Author: Bowen Li Date: 2017-09-22T00:56:03Z FLINK-7635 Support sideOutput in ProcessWindowFunciton commit 76e22afa3030db9dd555608168f7cf408696d1a7 Author: Bowen Li Date: 2017-09-22T01:07:02Z add output to InternalProcessApplyWindowContext commit 3da7ce9c439fb58c898f34acc87e11621df68947 Author: Bowen Li Date: 2017-09-22T01:57:03Z add scala API ---
[jira] [Commented] (FLINK-4319) Rework Cluster Management (FLIP-6)
[ https://issues.apache.org/jira/browse/FLINK-4319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175658#comment-16175658 ] Till Rohrmann commented on FLINK-4319: -- Hi [~suez1224], thanks for testing out the current state of Flip-6. This is valuable feedback which we have to address. 1) It's true that we don't wait on the completion of the {{JobMaster}} shutdown in the {{JobManagerRunner}} yet. However, I thought that the {{JobClusterEntrypoint}} should call {{System.exit}} upon termination of the job (at least eventually). This might point towards a bug. 2) The RM stops TMs after they were idle for a certain time (10s). However, the stopping logic is not yet implemented for the {{YarnResourceManager}}. See YarnResourceManager.java:231. This should be fairly straight forward. See FLINK-7076 for more information. 3) We are currently working on porting the web dashboard to the new architecture. See FLINK-7530 for the current state. If you like, then you could take over FLINK-7076 since we didn't make much progress there. Also FLINK-7530 could benefit from helping hands :-) > Rework Cluster Management (FLIP-6) > -- > > Key: FLINK-4319 > URL: https://issues.apache.org/jira/browse/FLINK-4319 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Affects Versions: 1.1.0 >Reporter: Stephan Ewen >Assignee: Till Rohrmann > Labels: flip-6 > > This is the root issue to track progress of the rework of cluster management > (FLIP-6) > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175656#comment-16175656 ] Till Rohrmann commented on FLINK-7076: -- Hi [~yuemeng], are you still working on this? > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: yuemeng > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7647) Port JobManagerConfigHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175640#comment-16175640 ] ASF GitHub Bot commented on FLINK-7647: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4691#discussion_r140379775 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java --- @@ -55,16 +59,19 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { private final GatewayRetriever leaderRetriever; + private final Configuration clusterConfiguration; private final RestHandlerConfiguration restConfiguration; private final Executor executor; public DispatcherRestEndpoint( - RestServerEndpointConfiguration configuration, + Configuration clusterConfiguration, + RestServerEndpointConfiguration endpointConfiguration, --- End diff -- Shall we remove the order between `clusterConfiguration` and `endpointConfiguration`? I usually like to have the super class arguments first. > Port JobManagerConfigHandler to new REST endpoint > - > > Key: FLINK-7647 > URL: https://issues.apache.org/jira/browse/FLINK-7647 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobManagerConfigHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4691: [FLINK-7647] [flip6] Port JobManagerConfigHandler ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4691#discussion_r140380156 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java --- @@ -33,20 +43,27 @@ /** * Returns the Job Manager's configuration. */ -public class JobManagerConfigHandler extends AbstractJsonRequestHandler { - - private static final String JOBMANAGER_CONFIG_REST_PATH = "/jobmanager/config"; +public class ClusterConfigHandler extends AbstractJsonRequestHandler + implements LegacyRestHandler { private final Configuration config; - public JobManagerConfigHandler(Executor executor, Configuration config) { + public ClusterConfigHandler(Executor executor, Configuration config) { super(executor); - this.config = config; + this.config = Preconditions.checkNotNull(config); } @Override public String[] getPaths() { - return new String[]{JOBMANAGER_CONFIG_REST_PATH}; + return new String[]{ClusterConfigurationHeaders.CLUSTER_CONFIG_REST_PATH}; + } + + @Override + public CompletableFuture handleRequest( + HandlerRequest request, + DispatcherGateway gateway) { + + return CompletableFuture.supplyAsync(() -> ClusterConfiguration.from(config), executor); --- End diff -- Maybe we could generate the `ClusterConfiguration` instance once at creation time and then always return this element as a `CompletableFuture.completedFuture(clusterConfiguration)`. ---
[jira] [Commented] (FLINK-7647) Port JobManagerConfigHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175639#comment-16175639 ] ASF GitHub Bot commented on FLINK-7647: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4691#discussion_r140381187 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler; +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import java.util.ArrayList; + +/** + * Response of the {@link ClusterConfigHandler}, respresented as a list + * of key-value pairs of the cluster {@link Configuration}. + */ +public class ClusterConfiguration extends ArrayList implements ResponseBody { --- End diff -- There is already another class called `ClusterConfiguration`. Maybe we should rename it in order to disambiguate it. > Port JobManagerConfigHandler to new REST endpoint > - > > Key: FLINK-7647 > URL: https://issues.apache.org/jira/browse/FLINK-7647 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobManagerConfigHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7647) Port JobManagerConfigHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175638#comment-16175638 ] ASF GitHub Bot commented on FLINK-7647: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4691#discussion_r140380156 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java --- @@ -33,20 +43,27 @@ /** * Returns the Job Manager's configuration. */ -public class JobManagerConfigHandler extends AbstractJsonRequestHandler { - - private static final String JOBMANAGER_CONFIG_REST_PATH = "/jobmanager/config"; +public class ClusterConfigHandler extends AbstractJsonRequestHandler + implements LegacyRestHandler { private final Configuration config; - public JobManagerConfigHandler(Executor executor, Configuration config) { + public ClusterConfigHandler(Executor executor, Configuration config) { super(executor); - this.config = config; + this.config = Preconditions.checkNotNull(config); } @Override public String[] getPaths() { - return new String[]{JOBMANAGER_CONFIG_REST_PATH}; + return new String[]{ClusterConfigurationHeaders.CLUSTER_CONFIG_REST_PATH}; + } + + @Override + public CompletableFuture handleRequest( + HandlerRequest request, + DispatcherGateway gateway) { + + return CompletableFuture.supplyAsync(() -> ClusterConfiguration.from(config), executor); --- End diff -- Maybe we could generate the `ClusterConfiguration` instance once at creation time and then always return this element as a `CompletableFuture.completedFuture(clusterConfiguration)`. > Port JobManagerConfigHandler to new REST endpoint > - > > Key: FLINK-7647 > URL: https://issues.apache.org/jira/browse/FLINK-7647 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobManagerConfigHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4691: [FLINK-7647] [flip6] Port JobManagerConfigHandler ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4691#discussion_r140381187 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler; +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import java.util.ArrayList; + +/** + * Response of the {@link ClusterConfigHandler}, respresented as a list + * of key-value pairs of the cluster {@link Configuration}. + */ +public class ClusterConfiguration extends ArrayList implements ResponseBody { --- End diff -- There is already another class called `ClusterConfiguration`. Maybe we should rename it in order to disambiguate it. ---
[jira] [Commented] (FLINK-7647) Port JobManagerConfigHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175637#comment-16175637 ] ASF GitHub Bot commented on FLINK-7647: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4691#discussion_r140382392 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler; +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import java.util.ArrayList; + +/** + * Response of the {@link ClusterConfigHandler}, respresented as a list + * of key-value pairs of the cluster {@link Configuration}. + */ +public class ClusterConfiguration extends ArrayList implements ResponseBody { --- End diff -- I'm actually wondering why we don't extend from a `Map` implementation since it should behave more like a map. This of course would require changes on the web gui side. I think now, the JSON would like ``` [{"key":"keyvalue", "value":"valuevalue"}, {}] ``` instead of ``` {"keyvalue":"valuevalue", } ``` Maybe we could change this in the future. > Port JobManagerConfigHandler to new REST endpoint > - > > Key: FLINK-7647 > URL: https://issues.apache.org/jira/browse/FLINK-7647 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobManagerConfigHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7647) Port JobManagerConfigHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175641#comment-16175641 ] ASF GitHub Bot commented on FLINK-7647: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4691#discussion_r140381079 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler; +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import java.util.ArrayList; + +/** + * Response of the {@link ClusterConfigHandler}, respresented as a list + * of key-value pairs of the cluster {@link Configuration}. + */ +public class ClusterConfiguration extends ArrayList implements ResponseBody { --- End diff -- Serial version uid missing. > Port JobManagerConfigHandler to new REST endpoint > - > > Key: FLINK-7647 > URL: https://issues.apache.org/jira/browse/FLINK-7647 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobManagerConfigHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4691: [FLINK-7647] [flip6] Port JobManagerConfigHandler ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4691#discussion_r140382392 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler; +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import java.util.ArrayList; + +/** + * Response of the {@link ClusterConfigHandler}, respresented as a list + * of key-value pairs of the cluster {@link Configuration}. + */ +public class ClusterConfiguration extends ArrayList implements ResponseBody { --- End diff -- I'm actually wondering why we don't extend from a `Map` implementation since it should behave more like a map. This of course would require changes on the web gui side. I think now, the JSON would like ``` [{"key":"keyvalue", "value":"valuevalue"}, {}] ``` instead of ``` {"keyvalue":"valuevalue", } ``` Maybe we could change this in the future. ---
[GitHub] flink pull request #4691: [FLINK-7647] [flip6] Port JobManagerConfigHandler ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4691#discussion_r140381079 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.messages; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler; +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import java.util.ArrayList; + +/** + * Response of the {@link ClusterConfigHandler}, respresented as a list + * of key-value pairs of the cluster {@link Configuration}. + */ +public class ClusterConfiguration extends ArrayList implements ResponseBody { --- End diff -- Serial version uid missing. ---
[GitHub] flink pull request #4691: [FLINK-7647] [flip6] Port JobManagerConfigHandler ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4691#discussion_r140379775 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java --- @@ -55,16 +59,19 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { private final GatewayRetriever leaderRetriever; + private final Configuration clusterConfiguration; private final RestHandlerConfiguration restConfiguration; private final Executor executor; public DispatcherRestEndpoint( - RestServerEndpointConfiguration configuration, + Configuration clusterConfiguration, + RestServerEndpointConfiguration endpointConfiguration, --- End diff -- Shall we remove the order between `clusterConfiguration` and `endpointConfiguration`? I usually like to have the super class arguments first. ---
[GitHub] flink pull request #4689: [FLINK-7655] [flip6] Set fencing token to null if ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4689 ---
[jira] [Commented] (FLINK-7655) Revisit default non-leader id for FencedRpcEndpoints
[ https://issues.apache.org/jira/browse/FLINK-7655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175614#comment-16175614 ] ASF GitHub Bot commented on FLINK-7655: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4689 > Revisit default non-leader id for FencedRpcEndpoints > > > Key: FLINK-7655 > URL: https://issues.apache.org/jira/browse/FLINK-7655 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > Fix For: 1.4.0 > > > Currently, when a {{FencedRpcEndpoint}} loses leadership, we set its leader > id to a random value. This can be problematic, even though it's unlikely, > because we might set it to a value which is used somewhere else (e.g. the > currently valid leader id). I think it would be better to simply set the > leader id to {{null}} in order to properly encode that the > {{FencedRpcEndpoint}} is no longer a leader. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7655) Revisit default non-leader id for FencedRpcEndpoints
[ https://issues.apache.org/jira/browse/FLINK-7655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7655. Resolution: Fixed Fix Version/s: 1.4.0 Fixed via 42cc3a2a9c41dda7cf338db36b45131db9150674 > Revisit default non-leader id for FencedRpcEndpoints > > > Key: FLINK-7655 > URL: https://issues.apache.org/jira/browse/FLINK-7655 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > Fix For: 1.4.0 > > > Currently, when a {{FencedRpcEndpoint}} loses leadership, we set its leader > id to a random value. This can be problematic, even though it's unlikely, > because we might set it to a value which is used somewhere else (e.g. the > currently valid leader id). I think it would be better to simply set the > leader id to {{null}} in order to properly encode that the > {{FencedRpcEndpoint}} is no longer a leader. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7655) Revisit default non-leader id for FencedRpcEndpoints
[ https://issues.apache.org/jira/browse/FLINK-7655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175609#comment-16175609 ] ASF GitHub Bot commented on FLINK-7655: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4689 Travis passed. Merging this PR. > Revisit default non-leader id for FencedRpcEndpoints > > > Key: FLINK-7655 > URL: https://issues.apache.org/jira/browse/FLINK-7655 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Currently, when a {{FencedRpcEndpoint}} loses leadership, we set its leader > id to a random value. This can be problematic, even though it's unlikely, > because we might set it to a value which is used somewhere else (e.g. the > currently valid leader id). I think it would be better to simply set the > leader id to {{null}} in order to properly encode that the > {{FencedRpcEndpoint}} is no longer a leader. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4689: [FLINK-7655] [flip6] Set fencing token to null if not lea...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4689 Travis passed. Merging this PR. ---
[jira] [Commented] (FLINK-4319) Rework Cluster Management (FLIP-6)
[ https://issues.apache.org/jira/browse/FLINK-4319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175494#comment-16175494 ] Shuyi Chen commented on FLINK-4319: --- Hi [~till.rohrmann], thanks a lot for the pointers. I was trying out the current status of Flip-6 flink-on-yarn. I found several problems: 1) JobMaster wont get killed after the job is finished. It seems the JobMaster never received the SHUTDOWN akka message. Maybe it's related to https://issues.apache.org/jira/browse/FLINK-5176? 2) The TaskManager wont get released after the job died. Maybe this is related to 1). 3) The web dashboard is no longer accessible. Can you shed some lights on these issues? I would be happy to help if needed. Thanks. > Rework Cluster Management (FLIP-6) > -- > > Key: FLINK-4319 > URL: https://issues.apache.org/jira/browse/FLINK-4319 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Affects Versions: 1.1.0 >Reporter: Stephan Ewen >Assignee: Till Rohrmann > Labels: flip-6 > > This is the root issue to track progress of the rework of cluster management > (FLIP-6) > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7666) ContinuousFileReaderOperator swallows chained watermarks
Ufuk Celebi created FLINK-7666: -- Summary: ContinuousFileReaderOperator swallows chained watermarks Key: FLINK-7666 URL: https://issues.apache.org/jira/browse/FLINK-7666 Project: Flink Issue Type: Improvement Components: Streaming Connectors Affects Versions: 1.3.2 Reporter: Ufuk Celebi I use event time and read from a (finite) file. I assign watermarks right after the {{ContinuousFileReaderOperator}} with parallelism 1. {code} env .readFile(new TextInputFormat(...), ...) .setParallelism(1) .assignTimestampsAndWatermarks(...) .setParallelism(1) .map()... {code} The watermarks I assign never progress through the pipeline. I can work around this by inserting a {{shuffle()}} after the file reader or starting a new chain at the assigner: {code} env .readFile(new TextInputFormat(...), ...) .setParallelism(1) .shuffle() .assignTimestampsAndWatermarks(...) .setParallelism(1) .map()... {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7665) Use wait/notify in ContinuousFileReaderOperator
Ufuk Celebi created FLINK-7665: -- Summary: Use wait/notify in ContinuousFileReaderOperator Key: FLINK-7665 URL: https://issues.apache.org/jira/browse/FLINK-7665 Project: Flink Issue Type: Improvement Components: Streaming Connectors Affects Versions: 1.4.0 Reporter: Ufuk Celebi Priority: Minor {{ContinuousFileReaderOperator}} has the following loop to receive input splits: {code} synchronized (checkpointLock) { if (currentSplit == null) { currentSplit = this.pendingSplits.poll(); if (currentSplit == null) { if (this.shouldClose) { isRunning = false; } else { checkpointLock.wait(50); } continue; } } } {code} I think we can replace this with a {{wait()}} and {{notify()}} in {{addSplit}} and {{close}}. If there is a reason to keep the {{wait(50)}}, feel free to close this issue. :-) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7664) Replace FlinkFutureException by CompletionException
[ https://issues.apache.org/jira/browse/FLINK-7664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175144#comment-16175144 ] ASF GitHub Bot commented on FLINK-7664: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4701 [FLINK-7664] Replace FlinkFutureException by java.util.concurrent.CompletionException ## What is the purpose of the change FlinkFutureException was introduced to fail a CompletableFuture callback. However, there was already such a class which allows to better handle failures in different stages which is the java.util.CompletionException. Therefore we replace FlinkFutureException by CompletionException and remove the former. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink removeFlinkFutureException Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4701.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4701 commit 3b94ca218e6ed4c8fc0fe491d33c072473651a9b Author: Till Rohrmann Date: 2017-09-21T17:21:07Z [FLINK-7664] Replace FlinkFutureException by java.util.concurrent.CompletionException FlinkFutureException was introduced to fail a CompletableFuture callback. However, there was already such a class which allows to better handle failures in different stages which is the java.util.CompletionException. Therefore we replace FlinkFutureException by CompletionException and remove the former. > Replace FlinkFutureException by CompletionException > --- > > Key: FLINK-7664 > URL: https://issues.apache.org/jira/browse/FLINK-7664 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{FlinkFutureException}} was introduced to fail in a > {{CompletableFuture}} callback. This can, however, better be done via the > {{CompletionException}}. Therefore, we should remove the > {{FlinkFutureException}} and replace it with the {{CompletionException}} > instead. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4701: [FLINK-7664] Replace FlinkFutureException by java....
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4701 [FLINK-7664] Replace FlinkFutureException by java.util.concurrent.CompletionException ## What is the purpose of the change FlinkFutureException was introduced to fail a CompletableFuture callback. However, there was already such a class which allows to better handle failures in different stages which is the java.util.CompletionException. Therefore we replace FlinkFutureException by CompletionException and remove the former. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink removeFlinkFutureException Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4701.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4701 commit 3b94ca218e6ed4c8fc0fe491d33c072473651a9b Author: Till Rohrmann Date: 2017-09-21T17:21:07Z [FLINK-7664] Replace FlinkFutureException by java.util.concurrent.CompletionException FlinkFutureException was introduced to fail a CompletableFuture callback. However, there was already such a class which allows to better handle failures in different stages which is the java.util.CompletionException. Therefore we replace FlinkFutureException by CompletionException and remove the former. ---
[jira] [Created] (FLINK-7664) Replace FlinkFutureException by CompletionException
Till Rohrmann created FLINK-7664: Summary: Replace FlinkFutureException by CompletionException Key: FLINK-7664 URL: https://issues.apache.org/jira/browse/FLINK-7664 Project: Flink Issue Type: Improvement Components: Core Affects Versions: 1.4.0 Reporter: Till Rohrmann Assignee: Till Rohrmann The {{FlinkFutureException}} was introduced to fail in a {{CompletableFuture}} callback. This can, however, better be done via the {{CompletionException}}. Therefore, we should remove the {{FlinkFutureException}} and replace it with the {{CompletionException}} instead. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor
[ https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175091#comment-16175091 ] ASF GitHub Bot commented on FLINK-7636: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4681#discussion_r140292203 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.prepare.CalcitePrepareImpl +import java.util.{List => JList} + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.prepare.Prepare.PreparingTable +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema.{StreamableTable, TranslatableTable} +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.stats.FlinkStatistic + +import scala.collection.JavaConverters._ + +/** + * FlinkRelOptTable wraps a FlinkTable + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + *respect to the Connection producing this table. + * @param table wrapped flink table + */ + +class FlinkRelOptTable protected( +schema: RelOptSchema, +rowType: RelDataType, +names: JList[String], +table: FlinkTable[_]) extends PreparingTable { + + /** +* Creates a copy of this Flink RelOptTable with new Flink Table and new row type. +* +* @param newTablenew flink table +* @param typeFactory type factory to create new row type of new flink table +* @return The copy of this Flink RelOptTable with new Flink table and new row type +*/ + def copy(newTable: FlinkTable[_], typeFactory: RelDataTypeFactory): FlinkRelOptTable = { +val newRowType = newTable.getRowType(typeFactory) +FlinkRelOptTable.create(schema, newRowType, names, newTable) + } + + /** +* Extends a table with the given extra fields, which is not supported now. +* +* @param extendedFields +* @return +*/ + override def extend(extendedFields: JList[RelDataTypeField]): RelOptTable = +throw new UnsupportedOperationException + + /** +* Obtains an identifier for this table. +* +* @return qualified name +*/ + override def getQualifiedName: JList[String] = names + + + /** +* Obtains the access type of the table. +* +* @return all access types including SELECT/UPDATE/INSERT/DELETE +*/ + override def getAllowedAccess: SqlAccessType = SqlAccessType.ALL --- End diff -- I think we should make this depending on the type of `table`. With [FLINK-6442](https://issues.apache.org/jira/browse/FLINK-6442) we added support to register `TableSink` and `INSERT INTO sinkTable SELECT`. So if `table` is backed by a `DataSet`, `DataStream`, or `TableSource` is supports `SELECT`. If `table` is backed by a `TableSink` it supports `INSE
[jira] [Commented] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor
[ https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175093#comment-16175093 ] ASF GitHub Bot commented on FLINK-7636: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4681#discussion_r140290245 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.prepare.CalcitePrepareImpl +import java.util.{List => JList} + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.prepare.Prepare.PreparingTable +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema.{StreamableTable, TranslatableTable} +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.stats.FlinkStatistic + +import scala.collection.JavaConverters._ + +/** + * FlinkRelOptTable wraps a FlinkTable + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + *respect to the Connection producing this table. + * @param table wrapped flink table + */ + +class FlinkRelOptTable protected( +schema: RelOptSchema, +rowType: RelDataType, +names: JList[String], --- End diff -- rename parameter to `qualifiedName` > Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan > node constructor > -- > > Key: FLINK-7636 > URL: https://issues.apache.org/jira/browse/FLINK-7636 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: jingzhang >Assignee: jingzhang > > At present, there are two ways to fetch TableSource of a TableSourceScan node > (e.g LogicalTableSourceScan, PhysicalTableSourceScan ...): > 1. > {code} > val relOptTable: RelOptTable = getTable() > val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]]) > val tableSouce = tableSourceTable.tableSource > {code} > the result of getTable() is instance of RelOptTableImpl now, and it will not > change after RelNode tree is built. > 2. now all TableSourceScan contains a tablesource as constructor parameter, > so we could fetch the tablesource directly later. > > The result tableSource is different with each other by above two ways after > apply project push(PPD) down or filter push down(FPD). It is very confusing. > we hope to fix the problem by introducing FlinkRelOptTable to replace > RelOptTableImpl, and remove tableSource parameter from TableSourceScan's > constructor. After PPD or FPD, a new FlinkRelOptTable instance which > contains a new TableSourceTable will be passed to TableSourceScan > constructor. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor
[ https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175090#comment-16175090 ] ASF GitHub Bot commented on FLINK-7636: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4681#discussion_r140291045 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.prepare.CalcitePrepareImpl +import java.util.{List => JList} + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.prepare.Prepare.PreparingTable +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema.{StreamableTable, TranslatableTable} +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.stats.FlinkStatistic + +import scala.collection.JavaConverters._ + +/** + * FlinkRelOptTable wraps a FlinkTable + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + *respect to the Connection producing this table. + * @param table wrapped flink table + */ + +class FlinkRelOptTable protected( +schema: RelOptSchema, +rowType: RelDataType, +names: JList[String], +table: FlinkTable[_]) extends PreparingTable { + + /** +* Creates a copy of this Flink RelOptTable with new Flink Table and new row type. +* +* @param newTablenew flink table +* @param typeFactory type factory to create new row type of new flink table +* @return The copy of this Flink RelOptTable with new Flink table and new row type +*/ + def copy(newTable: FlinkTable[_], typeFactory: RelDataTypeFactory): FlinkRelOptTable = { +val newRowType = newTable.getRowType(typeFactory) +FlinkRelOptTable.create(schema, newRowType, names, newTable) + } + + /** +* Extends a table with the given extra fields, which is not supported now. +* +* @param extendedFields --- End diff -- Remove is no parameter description is provided > Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan > node constructor > -- > > Key: FLINK-7636 > URL: https://issues.apache.org/jira/browse/FLINK-7636 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: jingzhang >Assignee: jingzhang > > At present, there are two ways to fetch TableSource of a TableSourceScan node > (e.g LogicalTableSourceScan, PhysicalTableSourceScan ...): > 1. > {code} > val relOptTable: RelOptTable = getTable() > val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]]) > val tableSouce = tableSourceTable.tableSource > {code} > the result of getTable() is instance of RelOptTableImpl now, and it will not > change after RelNode
[jira] [Commented] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor
[ https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175092#comment-16175092 ] ASF GitHub Bot commented on FLINK-7636: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4681#discussion_r140297123 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.prepare.CalcitePrepareImpl +import java.util.{List => JList} + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.prepare.Prepare.PreparingTable +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema.{StreamableTable, TranslatableTable} +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.stats.FlinkStatistic + +import scala.collection.JavaConverters._ + +/** + * FlinkRelOptTable wraps a FlinkTable + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + *respect to the Connection producing this table. + * @param table wrapped flink table + */ + +class FlinkRelOptTable protected( +schema: RelOptSchema, +rowType: RelDataType, +names: JList[String], +table: FlinkTable[_]) extends PreparingTable { --- End diff -- It looks as if most of the code was copied from `RelOptTableImpl`. I think a few methods can be simplified because some of the code does not apply to Flink and how we use Calcite. > Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan > node constructor > -- > > Key: FLINK-7636 > URL: https://issues.apache.org/jira/browse/FLINK-7636 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: jingzhang >Assignee: jingzhang > > At present, there are two ways to fetch TableSource of a TableSourceScan node > (e.g LogicalTableSourceScan, PhysicalTableSourceScan ...): > 1. > {code} > val relOptTable: RelOptTable = getTable() > val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]]) > val tableSouce = tableSourceTable.tableSource > {code} > the result of getTable() is instance of RelOptTableImpl now, and it will not > change after RelNode tree is built. > 2. now all TableSourceScan contains a tablesource as constructor parameter, > so we could fetch the tablesource directly later. > > The result tableSource is different with each other by above two ways after > apply project push(PPD) down or filter push down(FPD). It is very confusing. > we hope to fix the problem by introducing FlinkRelOptTable to replace > RelOptTableImpl, and remove tableSource parameter from TableSourceScan's > constructor. After PPD or FPD, a new FlinkRelOptTable instance which > contains a new TableSourceTable will be passed to TableSourceSca
[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4681#discussion_r140297123 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.prepare.CalcitePrepareImpl +import java.util.{List => JList} + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.prepare.Prepare.PreparingTable +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema.{StreamableTable, TranslatableTable} +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.stats.FlinkStatistic + +import scala.collection.JavaConverters._ + +/** + * FlinkRelOptTable wraps a FlinkTable + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + *respect to the Connection producing this table. + * @param table wrapped flink table + */ + +class FlinkRelOptTable protected( +schema: RelOptSchema, +rowType: RelDataType, +names: JList[String], +table: FlinkTable[_]) extends PreparingTable { --- End diff -- It looks as if most of the code was copied from `RelOptTableImpl`. I think a few methods can be simplified because some of the code does not apply to Flink and how we use Calcite. ---
[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4681#discussion_r140291045 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.prepare.CalcitePrepareImpl +import java.util.{List => JList} + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.prepare.Prepare.PreparingTable +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema.{StreamableTable, TranslatableTable} +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.stats.FlinkStatistic + +import scala.collection.JavaConverters._ + +/** + * FlinkRelOptTable wraps a FlinkTable + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + *respect to the Connection producing this table. + * @param table wrapped flink table + */ + +class FlinkRelOptTable protected( +schema: RelOptSchema, +rowType: RelDataType, +names: JList[String], +table: FlinkTable[_]) extends PreparingTable { + + /** +* Creates a copy of this Flink RelOptTable with new Flink Table and new row type. +* +* @param newTablenew flink table +* @param typeFactory type factory to create new row type of new flink table +* @return The copy of this Flink RelOptTable with new Flink table and new row type +*/ + def copy(newTable: FlinkTable[_], typeFactory: RelDataTypeFactory): FlinkRelOptTable = { +val newRowType = newTable.getRowType(typeFactory) +FlinkRelOptTable.create(schema, newRowType, names, newTable) + } + + /** +* Extends a table with the given extra fields, which is not supported now. +* +* @param extendedFields --- End diff -- Remove is no parameter description is provided ---
[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4681#discussion_r140292203 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.prepare.CalcitePrepareImpl +import java.util.{List => JList} + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.prepare.Prepare.PreparingTable +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema.{StreamableTable, TranslatableTable} +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.stats.FlinkStatistic + +import scala.collection.JavaConverters._ + +/** + * FlinkRelOptTable wraps a FlinkTable + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + *respect to the Connection producing this table. + * @param table wrapped flink table + */ + +class FlinkRelOptTable protected( +schema: RelOptSchema, +rowType: RelDataType, +names: JList[String], +table: FlinkTable[_]) extends PreparingTable { + + /** +* Creates a copy of this Flink RelOptTable with new Flink Table and new row type. +* +* @param newTablenew flink table +* @param typeFactory type factory to create new row type of new flink table +* @return The copy of this Flink RelOptTable with new Flink table and new row type +*/ + def copy(newTable: FlinkTable[_], typeFactory: RelDataTypeFactory): FlinkRelOptTable = { +val newRowType = newTable.getRowType(typeFactory) +FlinkRelOptTable.create(schema, newRowType, names, newTable) + } + + /** +* Extends a table with the given extra fields, which is not supported now. +* +* @param extendedFields +* @return +*/ + override def extend(extendedFields: JList[RelDataTypeField]): RelOptTable = +throw new UnsupportedOperationException + + /** +* Obtains an identifier for this table. +* +* @return qualified name +*/ + override def getQualifiedName: JList[String] = names + + + /** +* Obtains the access type of the table. +* +* @return all access types including SELECT/UPDATE/INSERT/DELETE +*/ + override def getAllowedAccess: SqlAccessType = SqlAccessType.ALL --- End diff -- I think we should make this depending on the type of `table`. With [FLINK-6442](https://issues.apache.org/jira/browse/FLINK-6442) we added support to register `TableSink` and `INSERT INTO sinkTable SELECT`. So if `table` is backed by a `DataSet`, `DataStream`, or `TableSource` is supports `SELECT`. If `table` is backed by a `TableSink` it supports `INSERT` (`DELETE` and `UPDATE` might be added later). ---
[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4681#discussion_r140290245 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.prepare.CalcitePrepareImpl +import java.util.{List => JList} + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.prepare.Prepare.PreparingTable +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema.{StreamableTable, TranslatableTable} +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.stats.FlinkStatistic + +import scala.collection.JavaConverters._ + +/** + * FlinkRelOptTable wraps a FlinkTable + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + *respect to the Connection producing this table. + * @param table wrapped flink table + */ + +class FlinkRelOptTable protected( +schema: RelOptSchema, +rowType: RelDataType, +names: JList[String], --- End diff -- rename parameter to `qualifiedName` ---
[jira] [Commented] (FLINK-7649) Port JobStoppingHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175080#comment-16175080 ] ASF GitHub Bot commented on FLINK-7649: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4700 [FLINK-7649] [flip6] Extend JobTerminationHandler to support stop ## What is the purpose of the change Rename the JobCancellationHandler into JobTerminationHandler which is now responsible for terminating jobs. Moreover, this commits adds two termination modes, cancel and stop, which are specified by providing a query parameter. ## Brief change log - Rename `JobCancellationHandler` into `JobTerminationHandler` being now responsible for the job termination (cancelling and stopping so far) - Introduce a `TerminationModeQueryParameter` which allows to specify the termination mode (`stop` and `cancel`) - Add `stopJob` to `DispatcherGateway` and `stop` to `JobMasterGateway` - Adapt `JobTerminationHandler` to either call `cancelJob` or `stopJob` on the `Dispatcher` depending on the mode (default is `cancel`) ## Verifying this change Tested manual that the right termination call is executed. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink portJobStoppingHandler Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4700.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4700 commit afa456f68a865ca5cf4aa1f60c2f1d43ff471e1b Author: Till Rohrmann Date: 2017-09-21T08:53:24Z [FLINK-7650] [flip6] Port JobCancellationHandler to new REST endpoint Let the JobCancellationHandler implement the LegacyRestHandler interface. Moreover, this commit adds the DELETE method to HttpMethodWrapper and the RestServerEndpoint#registerHandler method. commit a8f695607a1aa48d2af9175c47ed26dd965d9c05 Author: Till Rohrmann Date: 2017-09-21T14:47:18Z [FLINK-7649] [flip6] Extend JobTerminationHandler to support stop Rename the JobCancellationHandler into JobTerminationHandler which is now responsible for terminating jobs. Moreover, this commits adds two termination modes, cancel and stop, which are specified by providing a query parameter. > Port JobStoppingHandler to new REST endpoint > > > Key: FLINK-7649 > URL: https://issues.apache.org/jira/browse/FLINK-7649 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobStoppingHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4700: [FLINK-7649] [flip6] Extend JobTerminationHandler ...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4700 [FLINK-7649] [flip6] Extend JobTerminationHandler to support stop ## What is the purpose of the change Rename the JobCancellationHandler into JobTerminationHandler which is now responsible for terminating jobs. Moreover, this commits adds two termination modes, cancel and stop, which are specified by providing a query parameter. ## Brief change log - Rename `JobCancellationHandler` into `JobTerminationHandler` being now responsible for the job termination (cancelling and stopping so far) - Introduce a `TerminationModeQueryParameter` which allows to specify the termination mode (`stop` and `cancel`) - Add `stopJob` to `DispatcherGateway` and `stop` to `JobMasterGateway` - Adapt `JobTerminationHandler` to either call `cancelJob` or `stopJob` on the `Dispatcher` depending on the mode (default is `cancel`) ## Verifying this change Tested manual that the right termination call is executed. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink portJobStoppingHandler Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4700.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4700 commit afa456f68a865ca5cf4aa1f60c2f1d43ff471e1b Author: Till Rohrmann Date: 2017-09-21T08:53:24Z [FLINK-7650] [flip6] Port JobCancellationHandler to new REST endpoint Let the JobCancellationHandler implement the LegacyRestHandler interface. Moreover, this commit adds the DELETE method to HttpMethodWrapper and the RestServerEndpoint#registerHandler method. commit a8f695607a1aa48d2af9175c47ed26dd965d9c05 Author: Till Rohrmann Date: 2017-09-21T14:47:18Z [FLINK-7649] [flip6] Extend JobTerminationHandler to support stop Rename the JobCancellationHandler into JobTerminationHandler which is now responsible for terminating jobs. Moreover, this commits adds two termination modes, cancel and stop, which are specified by providing a query parameter. ---
[GitHub] flink pull request #4699: [FLINK-7663] [flip6] Return BAD_REQUEST if Handler...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4699 [FLINK-7663] [flip6] Return BAD_REQUEST if HandlerRequest cannot be created ## What is the purpose of the change This commit changes the behaviour such that a failure in creating a `HandlerRequest` will result in a BAD_REQUEST response by the AbstractRestHandler. ## Brief change log - Introduce the `HandlerRequestException` indicating that the `HandlerRequest` could not be created - Introduce a checked `ConversionException` for `MessageParameter#convertFromString` to let conversions fail explicitly - Send `BAD_REQUEST` response in case that the `HandlerRequest` could not be created ## Verifying this change - Added `RestEndpointITCase#testBadHandlerRequest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) R @zentol You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink supportBadRestRequests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4699.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4699 commit 38c8a188ad72a1ce48129077b71df971ade44354 Author: Till Rohrmann Date: 2017-09-21T16:14:45Z [FLINK-7663] [flip6] Return BAD_REQUEST if HandlerRequest cannot be created This commit changes the behaviour such that a failure in creating a HandlerRequest will result in a BAD_REQUEST response by the AbstractRestHandler. ---
[jira] [Commented] (FLINK-7663) Allow AbstractRestHandler to handle bad requests
[ https://issues.apache.org/jira/browse/FLINK-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175050#comment-16175050 ] ASF GitHub Bot commented on FLINK-7663: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4699 [FLINK-7663] [flip6] Return BAD_REQUEST if HandlerRequest cannot be created ## What is the purpose of the change This commit changes the behaviour such that a failure in creating a `HandlerRequest` will result in a BAD_REQUEST response by the AbstractRestHandler. ## Brief change log - Introduce the `HandlerRequestException` indicating that the `HandlerRequest` could not be created - Introduce a checked `ConversionException` for `MessageParameter#convertFromString` to let conversions fail explicitly - Send `BAD_REQUEST` response in case that the `HandlerRequest` could not be created ## Verifying this change - Added `RestEndpointITCase#testBadHandlerRequest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) R @zentol You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink supportBadRestRequests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4699.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4699 commit 38c8a188ad72a1ce48129077b71df971ade44354 Author: Till Rohrmann Date: 2017-09-21T16:14:45Z [FLINK-7663] [flip6] Return BAD_REQUEST if HandlerRequest cannot be created This commit changes the behaviour such that a failure in creating a HandlerRequest will result in a BAD_REQUEST response by the AbstractRestHandler. > Allow AbstractRestHandler to handle bad requests > > > Key: FLINK-7663 > URL: https://issues.apache.org/jira/browse/FLINK-7663 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > The {{AbstractRestHandler}} parses the request and tries to generate a > {{HandlerRequest}}. If this fails, then the server answers with an internal > server error. Instead we should allow the {{AbstractRestHandler}} to be able > to return a BAD_REQUEST status code. In order to do that, I would like to > introduce a {{HandlerRequestException}} which can be thrown while creating > the {{HandlerRequest}}. If this exception is thrown, then we return an error > message with {{BAD_REQUEST}} {{HttpResponseStatus}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7638) Port CurrentJobsOverviewHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175036#comment-16175036 ] ASF GitHub Bot commented on FLINK-7638: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4688#discussion_r140291476 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java --- @@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler( StringWriter writer = new StringWriter(); try { JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer(); + gen.writeStartObject(); if (includeRunningJobs && includeFinishedJobs) { gen.writeArrayFieldStart("running"); - for (JobDetails detail : result.getRunningJobs()) { - writeJobDetailOverviewAsJson(detail, gen, now); + for (JobDetails detail : result.getRunning()) { + jobDetailsSerializer.serialize(detail, gen, null); } gen.writeEndArray(); gen.writeArrayFieldStart("finished"); --- End diff -- Will change it. > Port CurrentJobsOverviewHandler to new REST endpoint > > > Key: FLINK-7638 > URL: https://issues.apache.org/jira/browse/FLINK-7638 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Port the existing {{CurrentJobsOverviewHandler}} to the new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7638) Port CurrentJobsOverviewHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175034#comment-16175034 ] ASF GitHub Bot commented on FLINK-7638: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4688#discussion_r140291389 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java --- @@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler( StringWriter writer = new StringWriter(); try { JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer(); + gen.writeStartObject(); if (includeRunningJobs && includeFinishedJobs) { gen.writeArrayFieldStart("running"); - for (JobDetails detail : result.getRunningJobs()) { - writeJobDetailOverviewAsJson(detail, gen, now); + for (JobDetails detail : result.getRunning()) { + jobDetailsSerializer.serialize(detail, gen, null); } gen.writeEndArray(); gen.writeArrayFieldStart("finished"); - for (JobDetails detail : result.getFinishedJobs()) { - writeJobDetailOverviewAsJson(detail, gen, now); + for (JobDetails detail : result.getFinished()) { + jobDetailsSerializer.serialize(detail, gen, null); } gen.writeEndArray(); } else { --- End diff -- Yes, because I think it's not so nice to return different answer formats depending on how the handler was instantiated. The respective information can be easily extracted from the complete overview. Moreover, I think we never used these endpoints in the first place. > Port CurrentJobsOverviewHandler to new REST endpoint > > > Key: FLINK-7638 > URL: https://issues.apache.org/jira/browse/FLINK-7638 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Port the existing {{CurrentJobsOverviewHandler}} to the new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7638) Port CurrentJobsOverviewHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175035#comment-16175035 ] ASF GitHub Bot commented on FLINK-7638: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4688#discussion_r140291439 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java --- @@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler( StringWriter writer = new StringWriter(); try { JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer(); + gen.writeStartObject(); if (includeRunningJobs && includeFinishedJobs) { gen.writeArrayFieldStart("running"); --- End diff -- True, will change it. > Port CurrentJobsOverviewHandler to new REST endpoint > > > Key: FLINK-7638 > URL: https://issues.apache.org/jira/browse/FLINK-7638 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Port the existing {{CurrentJobsOverviewHandler}} to the new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4688: [FLINK-7638] [flip6] Port CurrentJobsOverviewHandl...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4688#discussion_r140291439 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java --- @@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler( StringWriter writer = new StringWriter(); try { JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer(); + gen.writeStartObject(); if (includeRunningJobs && includeFinishedJobs) { gen.writeArrayFieldStart("running"); --- End diff -- True, will change it. ---
[GitHub] flink pull request #4688: [FLINK-7638] [flip6] Port CurrentJobsOverviewHandl...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4688#discussion_r140291476 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java --- @@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler( StringWriter writer = new StringWriter(); try { JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer(); + gen.writeStartObject(); if (includeRunningJobs && includeFinishedJobs) { gen.writeArrayFieldStart("running"); - for (JobDetails detail : result.getRunningJobs()) { - writeJobDetailOverviewAsJson(detail, gen, now); + for (JobDetails detail : result.getRunning()) { + jobDetailsSerializer.serialize(detail, gen, null); } gen.writeEndArray(); gen.writeArrayFieldStart("finished"); --- End diff -- Will change it. ---
[GitHub] flink pull request #4688: [FLINK-7638] [flip6] Port CurrentJobsOverviewHandl...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4688#discussion_r140291389 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java --- @@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler( StringWriter writer = new StringWriter(); try { JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer(); + gen.writeStartObject(); if (includeRunningJobs && includeFinishedJobs) { gen.writeArrayFieldStart("running"); - for (JobDetails detail : result.getRunningJobs()) { - writeJobDetailOverviewAsJson(detail, gen, now); + for (JobDetails detail : result.getRunning()) { + jobDetailsSerializer.serialize(detail, gen, null); } gen.writeEndArray(); gen.writeArrayFieldStart("finished"); - for (JobDetails detail : result.getFinishedJobs()) { - writeJobDetailOverviewAsJson(detail, gen, now); + for (JobDetails detail : result.getFinished()) { + jobDetailsSerializer.serialize(detail, gen, null); } gen.writeEndArray(); } else { --- End diff -- Yes, because I think it's not so nice to return different answer formats depending on how the handler was instantiated. The respective information can be easily extracted from the complete overview. Moreover, I think we never used these endpoints in the first place. ---
[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
[ https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174974#comment-16174974 ] Aljoscha Krettek commented on FLINK-7623: - I think that would be incorrect, because then it would also report "not restored" in case the operator had no state assigned but (globally) was still restored, i.e. some other parallel instances of this operator might have state. > Detecting whether an operator is restored doesn't work with chained state > - > > Key: FLINK-7623 > URL: https://issues.apache.org/jira/browse/FLINK-7623 > Project: Flink > Issue Type: Bug > Components: DataStream API, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > Attachments: StreamingJob.java > > > Originally reported on the ML: > https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E > If we have a chain of operators where multiple of them have operator state, > detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) > does not work correctly. It's best exemplified using this minimal example > where both the source and the flatMap have state: > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .addSource(new MaSource()).uid("source-1") > .flatMap(new MaFlatMap()).uid("flatMap-1"); > env.execute("testing"); > {code} > If I do a savepoint with these UIDs, then change "source-1" to "source-2" and > restore from the savepoint {{context.isRestored()}} still reports {{true}} > for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7638) Port CurrentJobsOverviewHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174972#comment-16174972 ] ASF GitHub Bot commented on FLINK-7638: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4688#discussion_r140281028 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java --- @@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler( StringWriter writer = new StringWriter(); try { JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer(); + gen.writeStartObject(); if (includeRunningJobs && includeFinishedJobs) { gen.writeArrayFieldStart("running"); --- End diff -- "running" --> can use `MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING` > Port CurrentJobsOverviewHandler to new REST endpoint > > > Key: FLINK-7638 > URL: https://issues.apache.org/jira/browse/FLINK-7638 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Port the existing {{CurrentJobsOverviewHandler}} to the new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7638) Port CurrentJobsOverviewHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174973#comment-16174973 ] ASF GitHub Bot commented on FLINK-7638: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4688#discussion_r140281116 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java --- @@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler( StringWriter writer = new StringWriter(); try { JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer(); + gen.writeStartObject(); if (includeRunningJobs && includeFinishedJobs) { gen.writeArrayFieldStart("running"); - for (JobDetails detail : result.getRunningJobs()) { - writeJobDetailOverviewAsJson(detail, gen, now); + for (JobDetails detail : result.getRunning()) { + jobDetailsSerializer.serialize(detail, gen, null); } gen.writeEndArray(); gen.writeArrayFieldStart("finished"); --- End diff -- "finished" --> can use `MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED` > Port CurrentJobsOverviewHandler to new REST endpoint > > > Key: FLINK-7638 > URL: https://issues.apache.org/jira/browse/FLINK-7638 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Port the existing {{CurrentJobsOverviewHandler}} to the new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4688: [FLINK-7638] [flip6] Port CurrentJobsOverviewHandl...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4688#discussion_r140281116 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java --- @@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler( StringWriter writer = new StringWriter(); try { JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer(); + gen.writeStartObject(); if (includeRunningJobs && includeFinishedJobs) { gen.writeArrayFieldStart("running"); - for (JobDetails detail : result.getRunningJobs()) { - writeJobDetailOverviewAsJson(detail, gen, now); + for (JobDetails detail : result.getRunning()) { + jobDetailsSerializer.serialize(detail, gen, null); } gen.writeEndArray(); gen.writeArrayFieldStart("finished"); --- End diff -- "finished" --> can use `MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED` ---
[GitHub] flink pull request #4688: [FLINK-7638] [flip6] Port CurrentJobsOverviewHandl...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4688#discussion_r140280832 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java --- @@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler( StringWriter writer = new StringWriter(); try { JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer(); + gen.writeStartObject(); if (includeRunningJobs && includeFinishedJobs) { gen.writeArrayFieldStart("running"); - for (JobDetails detail : result.getRunningJobs()) { - writeJobDetailOverviewAsJson(detail, gen, now); + for (JobDetails detail : result.getRunning()) { + jobDetailsSerializer.serialize(detail, gen, null); } gen.writeEndArray(); gen.writeArrayFieldStart("finished"); - for (JobDetails detail : result.getFinishedJobs()) { - writeJobDetailOverviewAsJson(detail, gen, now); + for (JobDetails detail : result.getFinished()) { + jobDetailsSerializer.serialize(detail, gen, null); } gen.writeEndArray(); } else { --- End diff -- On a second look, I think this response variant is not covered by the ported handler. ---
[jira] [Commented] (FLINK-7638) Port CurrentJobsOverviewHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174969#comment-16174969 ] ASF GitHub Bot commented on FLINK-7638: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4688#discussion_r140280832 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java --- @@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler( StringWriter writer = new StringWriter(); try { JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer(); + gen.writeStartObject(); if (includeRunningJobs && includeFinishedJobs) { gen.writeArrayFieldStart("running"); - for (JobDetails detail : result.getRunningJobs()) { - writeJobDetailOverviewAsJson(detail, gen, now); + for (JobDetails detail : result.getRunning()) { + jobDetailsSerializer.serialize(detail, gen, null); } gen.writeEndArray(); gen.writeArrayFieldStart("finished"); - for (JobDetails detail : result.getFinishedJobs()) { - writeJobDetailOverviewAsJson(detail, gen, now); + for (JobDetails detail : result.getFinished()) { + jobDetailsSerializer.serialize(detail, gen, null); } gen.writeEndArray(); } else { --- End diff -- On a second look, I think this response variant is not covered by the ported handler. > Port CurrentJobsOverviewHandler to new REST endpoint > > > Key: FLINK-7638 > URL: https://issues.apache.org/jira/browse/FLINK-7638 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Port the existing {{CurrentJobsOverviewHandler}} to the new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4688: [FLINK-7638] [flip6] Port CurrentJobsOverviewHandl...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4688#discussion_r140281028 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java --- @@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler( StringWriter writer = new StringWriter(); try { JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer(); + gen.writeStartObject(); if (includeRunningJobs && includeFinishedJobs) { gen.writeArrayFieldStart("running"); --- End diff -- "running" --> can use `MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING` ---
[jira] [Commented] (FLINK-7439) Support variable arguments for UDTF in SQL
[ https://issues.apache.org/jira/browse/FLINK-7439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174953#comment-16174953 ] ASF GitHub Bot commented on FLINK-7439: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4536#discussion_r140276174 --- Diff: docs/dev/table/udfs.md --- @@ -297,7 +297,7 @@ optionally implemented. While some of these methods allow the system more effici - `merge()` is required for many batch aggreagtions and session window aggregations. - `resetAccumulator()` is required for many batch aggregations. -All methods of `AggregateFunction` must be declared as `public`, not `static` and named exactly as the names mentioned above. The methods `createAccumulator`, `getValue`, `getResultType`, and `getAccumulatorType` are defined in the `AggregateFunction` abstract class, while others are contracted methods. In order to define a table function, one has to extend the base class `org.apache.flink.table.functions.AggregateFunction` and implement one (or more) `accumulate` methods. --- End diff -- "The method `accumulate` can be overloaded with different custom types and arguments and also support variable arguments." -> "The method `accumulate` can be overloaded with different parameter types and supports variable arguments." > Support variable arguments for UDTF in SQL > -- > > Key: FLINK-7439 > URL: https://issues.apache.org/jira/browse/FLINK-7439 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently, both UDF and UDAF support variable parameters, but UDTF not. > FLINK-5882 supports variable UDTF for Table API only, but missed SQL. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4536#discussion_r140276174 --- Diff: docs/dev/table/udfs.md --- @@ -297,7 +297,7 @@ optionally implemented. While some of these methods allow the system more effici - `merge()` is required for many batch aggreagtions and session window aggregations. - `resetAccumulator()` is required for many batch aggregations. -All methods of `AggregateFunction` must be declared as `public`, not `static` and named exactly as the names mentioned above. The methods `createAccumulator`, `getValue`, `getResultType`, and `getAccumulatorType` are defined in the `AggregateFunction` abstract class, while others are contracted methods. In order to define a table function, one has to extend the base class `org.apache.flink.table.functions.AggregateFunction` and implement one (or more) `accumulate` methods. --- End diff -- "The method `accumulate` can be overloaded with different custom types and arguments and also support variable arguments." -> "The method `accumulate` can be overloaded with different parameter types and supports variable arguments." ---
[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174926#comment-16174926 ] ASF GitHub Bot commented on FLINK-7378: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r140271306 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java --- @@ -186,7 +186,7 @@ public void testUniformDistributionBounded4() throws IOException { assertEquals(2, first.getNumBuffers()); assertEquals(2, second.getNumBuffers()); - String msg = "Wrong number of available segments after create buffer pool and request segments."; + String msg = "Wrong number of available segments after creating buffer pool and requesting segments."; --- End diff -- still "buffer pool**s**" > Create a fix size (non rebalancing) buffer pool type for the floating buffers > - > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}} directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r140271306 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java --- @@ -186,7 +186,7 @@ public void testUniformDistributionBounded4() throws IOException { assertEquals(2, first.getNumBuffers()); assertEquals(2, second.getNumBuffers()); - String msg = "Wrong number of available segments after create buffer pool and request segments."; + String msg = "Wrong number of available segments after creating buffer pool and requesting segments."; --- End diff -- still "buffer pool**s**" ---
[jira] [Created] (FLINK-7663) Allow AbstractRestHandler to handle bad requests
Till Rohrmann created FLINK-7663: Summary: Allow AbstractRestHandler to handle bad requests Key: FLINK-7663 URL: https://issues.apache.org/jira/browse/FLINK-7663 Project: Flink Issue Type: Bug Components: REST Affects Versions: 1.4.0 Reporter: Till Rohrmann Assignee: Till Rohrmann The {{AbstractRestHandler}} parses the request and tries to generate a {{HandlerRequest}}. If this fails, then the server answers with an internal server error. Instead we should allow the {{AbstractRestHandler}} to be able to return a BAD_REQUEST status code. In order to do that, I would like to introduce a {{HandlerRequestException}} which can be thrown while creating the {{HandlerRequest}}. If this exception is thrown, then we return an error message with {{BAD_REQUEST}} {{HttpResponseStatus}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174882#comment-16174882 ] ASF GitHub Bot commented on FLINK-6233: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140266297 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeS
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140266297 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[jira] [Commented] (FLINK-7647) Port JobManagerConfigHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174880#comment-16174880 ] ASF GitHub Bot commented on FLINK-7647: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4691 Sorry about that, local Travis tests pass, should be fine now. > Port JobManagerConfigHandler to new REST endpoint > - > > Key: FLINK-7647 > URL: https://issues.apache.org/jira/browse/FLINK-7647 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobManagerConfigHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4691: [FLINK-7647] [flip6] Port JobManagerConfigHandler to new ...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4691 Sorry about that, local Travis tests pass, should be fine now. ---
[jira] [Assigned] (FLINK-7652) Port CurrentJobIdsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai reassigned FLINK-7652: -- Assignee: Tzu-Li (Gordon) Tai > Port CurrentJobIdsHandler to new REST endpoint > -- > > Key: FLINK-7652 > URL: https://issues.apache.org/jira/browse/FLINK-7652 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{CurrentJobIdsHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7648) Port TaskManagersHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai reassigned FLINK-7648: -- Assignee: (was: Tzu-Li (Gordon) Tai) > Port TaskManagersHandler to new REST endpoint > - > > Key: FLINK-7648 > URL: https://issues.apache.org/jira/browse/FLINK-7648 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{TaskManagersHandler}} to the new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174875#comment-16174875 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140263201 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int, +private val timeIndicator: JoinTimeIndicator) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + //For delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negati
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140263201 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int, +private val timeIndicator: JoinTimeIndicator) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + //For delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +
[jira] [Created] (FLINK-7662) Remove unnecessary packaged licenses
Stephan Ewen created FLINK-7662: --- Summary: Remove unnecessary packaged licenses Key: FLINK-7662 URL: https://issues.apache.org/jira/browse/FLINK-7662 Project: Flink Issue Type: Improvement Components: Build System Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.4.0 With the new shading approach, we no longer shade ASM into Flink artifacts, so we do not need to package the ASM license into those artifacts any more. Instead, a shaded ASM artifact already containing a packaged license is used in the distribution build. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140262200 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174868#comment-16174868 ] ASF GitHub Bot commented on FLINK-6233: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140262200 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativ
[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/3511#discussion_r140258895 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java --- @@ -309,7 +309,7 @@ public boolean write(T record) throws IOException { // Access Utilities // - private long readPointer(int logicalPosition) { + protected long readPointer(int logicalPosition) { --- End diff -- `protected final` with a comment why is it so should be ok. ---
[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/3511#discussion_r140258675 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java --- @@ -47,61 +47,61 @@ public static final int MAX_NORMALIZED_KEY_LEN_PER_ELEMENT = 8; - private static final int MIN_REQUIRED_BUFFERS = 3; + public static final int MIN_REQUIRED_BUFFERS = 3; - private static final int LARGE_RECORD_THRESHOLD = 10 * 1024 * 1024; + public static final int LARGE_RECORD_THRESHOLD = 10 * 1024 * 1024; - private static final long LARGE_RECORD_TAG = 1L << 63; + public static final long LARGE_RECORD_TAG = 1L << 63; - private static final long POINTER_MASK = LARGE_RECORD_TAG - 1; + public static final long POINTER_MASK = LARGE_RECORD_TAG - 1; --- End diff -- Maybe put that into the comment inside the code? ---
[jira] [Commented] (FLINK-7661) Add credit field in PartitionRequest message
[ https://issues.apache.org/jira/browse/FLINK-7661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174841#comment-16174841 ] ASF GitHub Bot commented on FLINK-7661: --- GitHub user zhijiangW opened a pull request: https://github.com/apache/flink/pull/4698 [FLINK-7661][network] Add credit field in PartitionRequest message ## What is the purpose of the change *`PartitionRequest` message adds the credit field which corresponds to the number of exclusive segments in `InputChannel`*. *This pull request is based on [4499](https://github.com/apache/flink/pull/4499) whose commit is also included for passing travis. Review the third commit for this PR change*. ## Brief change log - *Add credit field in `PartitionRequest` message* - *Add `getInitialCredit()` method in `RemoteInputChannel`* ## Verifying this change This change is already covered by existing tests, such as *NettyMessageSerializationTest*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhijiangW/flink FLINK-7661 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4698.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4698 commit 693303c5d720fd720865f60aaf9a59f4bf547396 Author: Zhijiang Date: 2017-08-07T09:31:17Z [FLINK-7378][core]Create a fix size (non rebalancing) buffer pool type for the floating buffers commit 4c582cba58dbdfb0485b3d23375a62428a160b2b Author: Zhijiang Date: 2017-08-14T06:30:47Z [FLINK-7394][core] Manage exclusive buffers in RemoteInputChannel commit b9e0447202a7621adf8bd8646e7d2a54dca00f2d Author: Zhijiang Date: 2017-09-21T08:28:16Z [FLINK-7661][network] Add credit field in PartitionRequest message > Add credit field in PartitionRequest message > > > Key: FLINK-7661 > URL: https://issues.apache.org/jira/browse/FLINK-7661 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.4.0 >Reporter: zhijiang >Assignee: zhijiang > > Currently the {{PartitionRequest}} message contains {{ResultPartitionID}} | > {{queueIndex}} | {{InputChannelID}} fields. > We will add a new {{credit}} field indicating the initial credit of > {{InputChannel}}, and this info can be got from {{InputChannel}} directly > after assigning exclusive buffers to it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4698: [FLINK-7661][network] Add credit field in Partitio...
GitHub user zhijiangW opened a pull request: https://github.com/apache/flink/pull/4698 [FLINK-7661][network] Add credit field in PartitionRequest message ## What is the purpose of the change *`PartitionRequest` message adds the credit field which corresponds to the number of exclusive segments in `InputChannel`*. *This pull request is based on [4499](https://github.com/apache/flink/pull/4499) whose commit is also included for passing travis. Review the third commit for this PR change*. ## Brief change log - *Add credit field in `PartitionRequest` message* - *Add `getInitialCredit()` method in `RemoteInputChannel`* ## Verifying this change This change is already covered by existing tests, such as *NettyMessageSerializationTest*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhijiangW/flink FLINK-7661 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4698.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4698 commit 693303c5d720fd720865f60aaf9a59f4bf547396 Author: Zhijiang Date: 2017-08-07T09:31:17Z [FLINK-7378][core]Create a fix size (non rebalancing) buffer pool type for the floating buffers commit 4c582cba58dbdfb0485b3d23375a62428a160b2b Author: Zhijiang Date: 2017-08-14T06:30:47Z [FLINK-7394][core] Manage exclusive buffers in RemoteInputChannel commit b9e0447202a7621adf8bd8646e7d2a54dca00f2d Author: Zhijiang Date: 2017-09-21T08:28:16Z [FLINK-7661][network] Add credit field in PartitionRequest message ---
[GitHub] flink pull request #4695: [FLINK-7661][network] Add credit field in Partitio...
Github user zhijiangW closed the pull request at: https://github.com/apache/flink/pull/4695 ---
[jira] [Commented] (FLINK-7661) Add credit field in PartitionRequest message
[ https://issues.apache.org/jira/browse/FLINK-7661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174839#comment-16174839 ] ASF GitHub Bot commented on FLINK-7661: --- Github user zhijiangW closed the pull request at: https://github.com/apache/flink/pull/4695 > Add credit field in PartitionRequest message > > > Key: FLINK-7661 > URL: https://issues.apache.org/jira/browse/FLINK-7661 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.4.0 >Reporter: zhijiang >Assignee: zhijiang > > Currently the {{PartitionRequest}} message contains {{ResultPartitionID}} | > {{queueIndex}} | {{InputChannelID}} fields. > We will add a new {{credit}} field indicating the initial credit of > {{InputChannel}}, and this info can be got from {{InputChannel}} directly > after assigning exclusive buffers to it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140255052 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int, +private val timeIndicator: JoinTimeIndicator) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + //For delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative.") + } + + /** +* Get the maximum interval between receiving a row and emitting it (as part of a joined result). +* Only reasonable for row time join. +* +* @return the maximum delay for the outputs +
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174834#comment-16174834 ] ASF GitHub Bot commented on FLINK-6233: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140255052 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util.{ArrayList, List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A CoProcessFunction to execute time-bounded stream inner-join. + * Two kinds of time criteria: + * "L.time between R.time + X and R.time + Y" or "R.time between L.time - Y and L.time - X". + * + * @param leftLowerBound the lower bound for the left stream (X in the criteria) + * @param leftUpperBound the upper bound for the left stream (Y in the criteria) + * @param allowedLateness the lateness allowed for the two streams + * @param leftTypethe input type of left stream + * @param rightType the input type of right stream + * @param genJoinFuncName the function code of other non-equi conditions + * @param genJoinFuncCode the function name of other non-equi conditions + * @param timeIndicator indicate whether joining on proctime or rowtime + * + */ +abstract class TimeBoundedStreamInnerJoin( +private val leftLowerBound: Long, +private val leftUpperBound: Long, +private val allowedLateness: Long, +private val leftType: TypeInformation[Row], +private val rightType: TypeInformation[Row], +private val genJoinFuncName: String, +private val genJoinFuncCode: String, +private val leftTimeIdx: Int, +private val rightTimeIdx: Int, +private val timeIndicator: JoinTimeIndicator) +extends CoProcessFunction[CRow, CRow, CRow] +with Compiler[FlatJoinFunction[Row, Row, Row]] +with Logging { + + private var cRowWrapper: CRowWrappingCollector = _ + + // the join function for other conditions + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + // cache to store rows from the left stream + private var leftCache: MapState[Long, JList[Row]] = _ + // cache to store rows from the right stream + private var rightCache: MapState[Long, JList[Row]] = _ + + // state to record the timer on the left stream. 0 means no timer set + private var leftTimerState: ValueState[Long] = _ + // state to record the timer on the right stream. 0 means no timer set + private var rightTimerState: ValueState[Long] = _ + + private val leftRelativeSize: Long = -leftLowerBound + private val rightRelativeSize: Long = leftUpperBound + + protected var leftOperatorTime: Long = 0L + protected var rightOperatorTime: Long = 0L + + //For delayed cleanup + private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2 + + if (allowedLateness < 0) { +throw new IllegalArgumentException("The allowed lateness must be non-negative
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174815#comment-16174815 ] ASF GitHub Bot commented on FLINK-6233: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140251765 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeS
[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r140251765 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinLeftCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", -BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( +timeIndicator + "InnerJoinRightCache", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], +rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", -classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** -* Process records from the left stream. -* -* @param cRowValue the input record -* @param ctx the context to register timer or get current time -* @param out the collector for outputting results -* +* Process rows from the left stream. */ override def processElement1( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - rightRelativeSize +val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** -* Process records from the right stream. -* -* @param cRowValue the input record -* @param ctx the context to get current time -* @param out the collector for outputting results -* +* Process rows from the right stream. */ override def processElement2( -cRowValue: CRow, -ctx: CoProcessFunction[CRow, CRow, CRow]#Context, -out: Collector[CRow]): Unit = { -val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) -getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { +updateOperatorTime(ctx) +val rowTime: Long = getTimeForRightStream(ctx, cRowValue) +val oppositeLowerBound: Long = rowTime - leftRelativeSize +val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime,
[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174757#comment-16174757 ] ASF GitHub Bot commented on FLINK-7378: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4485 @NicoK , sorry for the typos. I have submitted the updates. > Create a fix size (non rebalancing) buffer pool type for the floating buffers > - > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}} directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4485 @NicoK , sorry for the typos. I have submitted the updates. ---
[jira] [Commented] (FLINK-7638) Port CurrentJobsOverviewHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174746#comment-16174746 ] ASF GitHub Bot commented on FLINK-7638: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4688 @tillrohrmann Yes that makes sense, lets do that separately in the future then. +1. > Port CurrentJobsOverviewHandler to new REST endpoint > > > Key: FLINK-7638 > URL: https://issues.apache.org/jira/browse/FLINK-7638 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Port the existing {{CurrentJobsOverviewHandler}} to the new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4688: [FLINK-7638] [flip6] Port CurrentJobsOverviewHandler to n...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4688 @tillrohrmann Yes that makes sense, lets do that separately in the future then. +1. ---
[jira] [Commented] (FLINK-7502) PrometheusReporter improvements
[ https://issues.apache.org/jira/browse/FLINK-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174734#comment-16174734 ] ASF GitHub Bot commented on FLINK-7502: --- Github user mbode commented on the issue: https://github.com/apache/flink/pull/4586 @zentol It would be great if you could have another look on occasion, I added better handling for metrics that are registered e.g. by different subtasks. [green travis](https://travis-ci.org/mbode/flink/builds/274685138) > PrometheusReporter improvements > --- > > Key: FLINK-7502 > URL: https://issues.apache.org/jira/browse/FLINK-7502 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Maximilian Bode >Assignee: Maximilian Bode >Priority: Minor > > * do not throw exceptions on metrics being registered for second time > * allow port ranges for setups where multiple reporters are on same host > (e.g. one TaskManager and one JobManager) > * do not use nanohttpd anymore, there is now a minimal http server included > in [Prometheus JVM client|https://github.com/prometheus/client_java] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4586: [FLINK-7502] [metrics] Improve PrometheusReporter
Github user mbode commented on the issue: https://github.com/apache/flink/pull/4586 @zentol It would be great if you could have another look on occasion, I added better handling for metrics that are registered e.g. by different subtasks. [green travis](https://travis-ci.org/mbode/flink/builds/274685138) ---
[jira] [Assigned] (FLINK-7649) Port JobStoppingHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-7649: Assignee: Till Rohrmann > Port JobStoppingHandler to new REST endpoint > > > Key: FLINK-7649 > URL: https://issues.apache.org/jira/browse/FLINK-7649 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing `JobStoppingHandler` to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7649) Port JobStoppingHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-7649: - Description: Port existing {{JobStoppingHandler}} to new REST endpoint (was: Port existing `JobStoppingHandler` to new REST endpoint) > Port JobStoppingHandler to new REST endpoint > > > Key: FLINK-7649 > URL: https://issues.apache.org/jira/browse/FLINK-7649 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobStoppingHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7647) Port JobManagerConfigHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174731#comment-16174731 ] ASF GitHub Bot commented on FLINK-7647: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4691 Still failing with `ClusterConfigHandlerTest.testGetPaths:32 NullPointer` > Port JobManagerConfigHandler to new REST endpoint > - > > Key: FLINK-7647 > URL: https://issues.apache.org/jira/browse/FLINK-7647 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{JobManagerConfigHandler}} to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)