[GitHub] flink pull request #4704: [hotfix] [docs] Update function name in DataStream...

2017-09-21 Thread phiradet
GitHub user phiradet opened a pull request:

https://github.com/apache/flink/pull/4704

[hotfix] [docs] Update function name in DataStream API

Currently, the `javaStream` should be used to get the underlying java 
DataStream object -- not `getJavaStream`



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/phiradet/flink hotfix-doc-datastream_api

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4704.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4704


commit b7f9544fe65adbab88ddc2869f8d6a523291e938
Author: Bang, Phiradet 
Date:   2017-09-22T06:35:37Z

To get the underlying java DataStream object, the javaStream should be 
used, not getJavaStream




---


[jira] [Commented] (FLINK-7388) ProcessFunction.onTimer() sets processing time as timestamp

2017-09-21 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16176009#comment-16176009
 ] 

Bowen Li commented on FLINK-7388:
-

Interesting one. I just got some experience on ProcessFunction and would like 
to re-enforce my understanding of it

> ProcessFunction.onTimer() sets processing time as timestamp
> ---
>
> Key: FLINK-7388
> URL: https://issues.apache.org/jira/browse/FLINK-7388
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Bowen Li
>
> The {{ProcessFunction.onTimer()}} method sets the current processing time as 
> event-time timestamp when it is called from a processing time timer.
> I don't think this behavior is useful. Processing time timestamps won't be 
> aligned with watermarks and are not deterministic. The only reason would be 
> to have _some_ value in the timestamp field. However, the behavior is very 
> subtle and might not be noticed by users.
> IMO, it would be better to erase the timestamp. This will cause downstream 
> operator that rely on timestamps to fail and notify the users that the logic 
> they implemented was probably not what they intended to do.
> What do you think [~aljoscha]?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7388) ProcessFunction.onTimer() sets processing time as timestamp

2017-09-21 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-7388:
---

Assignee: Bowen Li

> ProcessFunction.onTimer() sets processing time as timestamp
> ---
>
> Key: FLINK-7388
> URL: https://issues.apache.org/jira/browse/FLINK-7388
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Bowen Li
>
> The {{ProcessFunction.onTimer()}} method sets the current processing time as 
> event-time timestamp when it is called from a processing time timer.
> I don't think this behavior is useful. Processing time timestamps won't be 
> aligned with watermarks and are not deterministic. The only reason would be 
> to have _some_ value in the timestamp field. However, the behavior is very 
> subtle and might not be noticed by users.
> IMO, it would be better to erase the timestamp. This will cause downstream 
> operator that rely on timestamps to fail and notify the users that the logic 
> they implemented was probably not what they intended to do.
> What do you think [~aljoscha]?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7433) Lack of synchronization accessing inProgress in JobCancellationWithSavepointHandlers

2017-09-21 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175981#comment-16175981
 ] 

Bowen Li commented on FLINK-7433:
-

[~tedyu] Hi Ted, this seems to be a dup of FLINK-7659. I've submitted a PR for 
FLINK-7659, and will close this one

> Lack of synchronization accessing inProgress in 
> JobCancellationWithSavepointHandlers
> 
>
> Key: FLINK-7433
> URL: https://issues.apache.org/jira/browse/FLINK-7433
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   try {
> if (throwable != null) {
>   completed.put(requestId, throwable);
> } else {
>   completed.put(requestId, path);
> }
>   } finally {
> inProgress.remove(jobId);
>   }
> {code}
> The call to inProgress.remove(jobId) should be protected by lock object.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7433) Lack of synchronization accessing inProgress in JobCancellationWithSavepointHandlers

2017-09-21 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-7433.
---
Resolution: Duplicate

> Lack of synchronization accessing inProgress in 
> JobCancellationWithSavepointHandlers
> 
>
> Key: FLINK-7433
> URL: https://issues.apache.org/jira/browse/FLINK-7433
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   try {
> if (throwable != null) {
>   completed.put(requestId, throwable);
> } else {
>   completed.put(requestId, path);
> }
>   } finally {
> inProgress.remove(jobId);
>   }
> {code}
> The call to inProgress.remove(jobId) should be protected by lock object.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7592) How to close the output log like "Connected to JobManager at Actor" and "Job execution switched to status RUNNING."?

2017-09-21 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175977#comment-16175977
 ] 

Bowen Li commented on FLINK-7592:
-

Hi, are you requesting a feature or asking this specific question? If it's a 
feature request, can you please re-phrase its description? If you are asking 
questions, please use Flink's mailing list. JIRA is for keeping track of tasks, 
features, issues, and bugs.

> How to close the output log like "Connected to JobManager at Actor" and "Job 
> execution switched to status RUNNING."?
> 
>
> Key: FLINK-7592
> URL: https://issues.apache.org/jira/browse/FLINK-7592
> Project: Flink
>  Issue Type: Wish
>  Components: Configuration
>Affects Versions: 1.3.2
> Environment: jdk 1.8
>Reporter: brighteast
>  Labels: features
> Fix For: 1.3.2
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1060959513] 
> with leader session id 59ff044a-ab2b-4518-b7b8-8352da361511.
> 09/06/2017 15:06:35   Job execution switched to status RUNNING.
> 09/06/2017 15:06:35   CHAIN DataSource (at 
> org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:601)
>  (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at 
> flinkTest$.delayedEndpoint$flinkTest$1(flinkTest.scala:10)) -> Map (Map at 
> flinkTest$.delayedEndpoint$flinkTest$1(flinkTest.scala:11)) -> 
> Combine(SUM(1))(1/1) switched to SCHEDULED 
> 09/06/2017 15:06:35   CHAIN DataSource (at 
> org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:601)
>  (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at 
> flinkTest$.delayedEndpoint$flinkTest$1(flinkTest.scala:10)) -> Map (Map at 
> flinkTest$.delayedEndpoint$flinkTest$1(flinkTest.scala:11)) -> 
> Combine(SUM(1))(1/1) switched to DEPLOYING 
> 09/06/2017 15:06:35   CHAIN DataSource (at 
> org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:601)
>  (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at 
> flinkTest$.delayedEndpoint$flinkTest$1(flinkTest.scala:10)) -> Map (Map at 
> flinkTest$.delayedEndpoint$flinkTest$1(flinkTest.scala:11)) -> 
> Combine(SUM(1))(1/1) switched to RUNNING 
> 09/06/2017 15:06:35   Reduce (SUM(1))(2/8) switched to SCHEDULED 
> 09/06/2017 15:06:35   Reduce (SUM(1))(1/8) switched to SCHEDULED 
> 09/06/2017 15:06:35   Reduce (SUM(1))(3/8) switched to SCHEDULED 
> 09/06/2017 15:06:35   Reduce (SUM(1))(4/8) switched to SCHEDULED 
> 09/06/2017 15:06:35   Reduce (SUM(1))(5/8) switched to SCHEDULED 
> 09/06/2017 15:06:35   Reduce (SUM(1))(6/8) switched to SCHEDULED 
> 09/06/2017 15:06:35   Reduce (SUM(1))(7/8) switched to SCHEDULED 
> 09/06/2017 15:06:35   Reduce (SUM(1))(8/8) switched to SCHEDULED 
> 09/06/2017 15:06:35   Reduce (SUM(1))(4/8) switched to DEPLOYING 
> 09/06/2017 15:06:35   Reduce (SUM(1))(8/8) switched to DEPLOYING 
> 09/06/2017 15:06:35   Reduce (SUM(1))(7/8) switched to DEPLOYING 
> 09/06/2017 15:06:35   Reduce (SUM(1))(6/8) switched to DEPLOYING 
> 09/06/2017 15:06:35   Reduce (SUM(1))(3/8) switched to DEPLOYING 
> 09/06/2017 15:06:35   Reduce (SUM(1))(1/8) switched to DEPLOYING 
> 09/06/2017 15:06:35   Reduce (SUM(1))(5/8) switched to DEPLOYING 
> 09/06/2017 15:06:35   CHAIN DataSource (at 
> org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:601)
>  (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (FlatMap at 
> flinkTest$.delayedEndpoint$flinkTest$1(flinkTest.scala:10)) -> Map (Map at 
> flinkTest$.delayedEndpoint$flinkTest$1(flinkTest.scala:11)) -> 
> Combine(SUM(1))(1/1) switched to FINISHED 
> 09/06/2017 15:06:35   Reduce (SUM(1))(2/8) switched to DEPLOYING 
> 09/06/2017 15:06:35   Reduce (SUM(1))(5/8) switched to RUNNING 
> 09/06/2017 15:06:35   Reduce (SUM(1))(8/8) switched to RUNNING 
> 09/06/2017 15:06:35   Reduce (SUM(1))(7/8) switched to RUNNING 
> 09/06/2017 15:06:35   Reduce (SUM(1))(2/8) switched to RUNNING 
> 09/06/2017 15:06:35   Reduce (SUM(1))(3/8) switched to RUNNING 
> 09/06/2017 15:06:35   Reduce (SUM(1))(6/8) switched to RUNNING 
> 09/06/2017 15:06:35   Reduce (SUM(1))(4/8) switched to RUNNING 
> 09/06/2017 15:06:35   Reduce (SUM(1))(1/8) switched to RUNNING 
> 09/06/2017 15:06:35   DataSink (collect())(1/8) switched to SCHEDULED 
> 09/06/2017 15:06:35   DataSink (collect())(8/8) switched to SCHEDULED 
> 09/06/2017 15:06:35   DataSink (collect())(8/8) switched to DEPLOYING 
> 09/06/2017 15:06:35   DataSink (collect())(1/8) switched to DEPLOYING 
> 09/06/2017 15:06:35   DataSink (collect())(4/8) switched to SCHEDULED 

[jira] [Commented] (FLINK-5726) Add the RocketMQ plugin for the Apache Flink

2017-09-21 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175963#comment-16175963
 ] 

Bowen Li commented on FLINK-5726:
-

[~longda] Hi Longda, I'm curious that is Alibaba's BLINK using RocketMQ or 
Kafka?

> Add the RocketMQ plugin for the Apache Flink
> 
>
> Key: FLINK-5726
> URL: https://issues.apache.org/jira/browse/FLINK-5726
> Project: Flink
>  Issue Type: Task
>  Components: Streaming Connectors
>Reporter: Longda Feng
>Priority: Minor
>
> Apache RocketMQ® is an open source distributed messaging and streaming data 
> platform. It has been used in a lot of companies. Please refer to 
> http://rocketmq.incubator.apache.org/ for more details.
> Since the Apache RocketMq 4.0 will be released in the next few days, we can 
> start the job of adding the RocketMq plugin for the Apache Flink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175953#comment-16175953
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , I missed that message of `verifyAllBuffersReturned()` issue before.
I have submitted the modifications of it. :)


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}} directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-09-21 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , I missed that message of `verifyAllBuffersReturned()` issue before.
I have submitted the modifications of it. :)


---


[GitHub] flink pull request #4703: [FLINK-7659][REST] Unprotected access to inProgres...

2017-09-21 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/4703

[FLINK-7659][REST] Unprotected access to inProgress in 
JobCancellationWithSavepointHandlers#handleNewRequest

## What is the purpose of the change

There's a thread accessing `inProgress` without proper synchronization

## Brief change log

Add synchronization to access `inProgress` 

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

## Documentation

  - Does this pull request introduce a new feature? (no)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-7659

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4703.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4703


commit 6b7904bcfe809542216e12e4b48933fcb47d2d1d
Author: Bowen Li 
Date:   2017-09-22T05:33:17Z

FLINK-7659 Unprotected access to inProgress in 
JobCancellationWithSavepointHandlers#handleNewRequest




---


[jira] [Commented] (FLINK-7659) Unprotected access to inProgress in JobCancellationWithSavepointHandlers#handleNewRequest

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175942#comment-16175942
 ] 

ASF GitHub Bot commented on FLINK-7659:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/4703

[FLINK-7659][REST] Unprotected access to inProgress in 
JobCancellationWithSavepointHandlers#handleNewRequest

## What is the purpose of the change

There's a thread accessing `inProgress` without proper synchronization

## Brief change log

Add synchronization to access `inProgress` 

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

## Documentation

  - Does this pull request introduce a new feature? (no)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-7659

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4703.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4703


commit 6b7904bcfe809542216e12e4b48933fcb47d2d1d
Author: Bowen Li 
Date:   2017-09-22T05:33:17Z

FLINK-7659 Unprotected access to inProgress in 
JobCancellationWithSavepointHandlers#handleNewRequest




> Unprotected access to inProgress in 
> JobCancellationWithSavepointHandlers#handleNewRequest
> -
>
> Key: FLINK-7659
> URL: https://issues.apache.org/jira/browse/FLINK-7659
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Reporter: Ted Yu
>Assignee: Bowen Li
>
> Here is related code:
> {code}
>   } finally {
> inProgress.remove(jobId);
>   }
> {code}
> A little lower, in another finally block, there is:
> {code}
>   synchronized (lock) {
> if (!success) {
>   inProgress.remove(jobId);
> {code}
> which is correct.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7659) Unprotected access to inProgress in JobCancellationWithSavepointHandlers#handleNewRequest

2017-09-21 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-7659:

Component/s: REST

> Unprotected access to inProgress in 
> JobCancellationWithSavepointHandlers#handleNewRequest
> -
>
> Key: FLINK-7659
> URL: https://issues.apache.org/jira/browse/FLINK-7659
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Reporter: Ted Yu
>Assignee: Bowen Li
>
> Here is related code:
> {code}
>   } finally {
> inProgress.remove(jobId);
>   }
> {code}
> A little lower, in another finally block, there is:
> {code}
>   synchronized (lock) {
> if (!success) {
>   inProgress.remove(jobId);
> {code}
> which is correct.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7659) Unprotected access to inProgress in JobCancellationWithSavepointHandlers#handleNewRequest

2017-09-21 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-7659:
---

Assignee: Bowen Li

> Unprotected access to inProgress in 
> JobCancellationWithSavepointHandlers#handleNewRequest
> -
>
> Key: FLINK-7659
> URL: https://issues.apache.org/jira/browse/FLINK-7659
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Bowen Li
>
> Here is related code:
> {code}
>   } finally {
> inProgress.remove(jobId);
>   }
> {code}
> A little lower, in another finally block, there is:
> {code}
>   synchronized (lock) {
> if (!success) {
>   inProgress.remove(jobId);
> {code}
> which is correct.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7635) Support sideOutput in ProcessWindowFunciton

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175888#comment-16175888
 ] 

ASF GitHub Bot commented on FLINK-7635:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4702
  
@aljoscha @chenqin Hi guys, can you please take a look? I'll write doc 
about this feature with 
[FLINK-7660](https://issues.apache.org/jira/browse/FLINK-7660).  BTW, this is 
my first time writing Scala


> Support sideOutput in ProcessWindowFunciton
> ---
>
> Key: FLINK-7635
> URL: https://issues.apache.org/jira/browse/FLINK-7635
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Scala API
>Reporter: Chen Qin
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only 
> implemented output to ProcessFunction Context. It would be nice to add 
> support to ProcessWindow and ProcessAllWindow functions as well. [email 
> threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html]
> [~aljoscha] I thought this is good warm up task for ppl to learn how window 
> function works in general. Otherwise feel free to assign back to me.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4702: [FLINK-7635][DataStream API][Scala API] Support sideOutpu...

2017-09-21 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4702
  
@aljoscha @chenqin Hi guys, can you please take a look? I'll write doc 
about this feature with 
[FLINK-7660](https://issues.apache.org/jira/browse/FLINK-7660).  BTW, this is 
my first time writing Scala


---


[jira] [Commented] (FLINK-7635) Support sideOutput in ProcessWindowFunciton

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175800#comment-16175800
 ] 

ASF GitHub Bot commented on FLINK-7635:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/4702

[FLINK-7635][DataStream API][Scala API] Support sideOutput in 
ProcessWindowFunciton

## What is the purpose of the change

Add support for sideOutput in ProcessWindowFunciton

## Verifying this change

This change added tests and can be verified as follows:

 - Added integration tests for DataStream API in 
`flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java`
 - Added integration tests for Scala API in 

`flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala`

## Does this pull request potentially affect one of the following parts:

- The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)

## Documentation

- Does this pull request introduce a new feature? (yes)
- If yes, how is the feature documented? (I will add document for this 
feature and [FLINK-7660](https://issues.apache.org/jira/browse/FLINK-7660) 
together when implementing FLINK-7660)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-7635

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4702.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4702


commit 6386983239bd3024b395c865ec4fd33e232ca5a3
Author: Bowen Li 
Date:   2017-08-30T16:35:03Z

FLINK-7422 Upgrade Kinesis Client Library (KCL) and AWS SDK in 
flink-connector-kinesis

commit 381cd4156b84673a1d32d2db3f7b2d748d90d980
Author: Bowen Li 
Date:   2017-09-07T06:33:37Z

Merge remote-tracking branch 'upstream/master'

commit dcf40bd821187b848d924f7f4df6805b1b924c16
Author: Bowen Li 
Date:   2017-09-15T18:00:03Z

Merge remote-tracking branch 'upstream/master'

commit 169ea0a3bee1ba315d39fa49c16e9bd7c71d1338
Author: Bowen Li 
Date:   2017-09-18T06:25:26Z

Merge remote-tracking branch 'upstream/master'

commit 659e91c18ade8eb65d355b5b85ae2d402a61ff5e
Author: Bowen Li 
Date:   2017-09-18T23:50:48Z

Merge remote-tracking branch 'upstream/master'

commit 990c4648a1427ca7c3c27453fe2a40cd5cac3734
Author: Bowen Li 
Date:   2017-09-19T17:18:54Z

Merge remote-tracking branch 'upstream/master'

commit fb88d61579cbc602b21f96ffca5c189aa846bec1
Author: Bowen Li 
Date:   2017-09-20T05:56:14Z

Merge branch 'master' of github.com:bowenli86/flink

commit 3538421e6e8d194fe838f662480f44b57a41e5e2
Author: Bowen Li 
Date:   2017-09-20T06:35:34Z

add full path of WindowWordCount to doc to remove warning from IDE

commit e08f099465f495401efad1a4d8f54a19735df76c
Author: Bowen Li 
Date:   2017-09-20T18:08:06Z

format code and doc

commit 83fca0651f8d342aea83291bfe4d45bc8df646d2
Author: Bowen Li 
Date:   2017-09-21T19:13:17Z

update window doc

commit a90e18102e8f85d305cd5f2f077db202f471023b
Author: Bowen Li 
Date:   2017-09-22T00:56:03Z

FLINK-7635 Support sideOutput in ProcessWindowFunciton

commit 76e22afa3030db9dd555608168f7cf408696d1a7
Author: Bowen Li 
Date:   2017-09-22T01:07:02Z

add output to InternalProcessApplyWindowContext

commit 3da7ce9c439fb58c898f34acc87e11621df68947
Author: Bowen Li 
Date:   2017-09-22T01:57:03Z

add scala API




> Support sideOutput in ProcessWindowFunciton
> ---
>
> Key: FLINK-7635
> URL: https://issues.apache.org/jira/browse/FLINK-7635
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Scala API
>Reporter: Chen Qin
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only 
> implemented output to ProcessFunction Context. It would be nice to add 
> support to ProcessWindow and ProcessAllWindow functions as well. [email 
> threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html]
> [~aljoscha] I thought this is good warm up task for ppl to learn how window 
> function works in general. Otherwise feel free to assign back to me.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4702: [FLINK-7635][DataStream API][Scala API] Support si...

2017-09-21 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/4702

[FLINK-7635][DataStream API][Scala API] Support sideOutput in 
ProcessWindowFunciton

## What is the purpose of the change

Add support for sideOutput in ProcessWindowFunciton

## Verifying this change

This change added tests and can be verified as follows:

 - Added integration tests for DataStream API in 
`flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java`
 - Added integration tests for Scala API in 

`flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala`

## Does this pull request potentially affect one of the following parts:

- The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)

## Documentation

- Does this pull request introduce a new feature? (yes)
- If yes, how is the feature documented? (I will add document for this 
feature and [FLINK-7660](https://issues.apache.org/jira/browse/FLINK-7660) 
together when implementing FLINK-7660)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-7635

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4702.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4702


commit 6386983239bd3024b395c865ec4fd33e232ca5a3
Author: Bowen Li 
Date:   2017-08-30T16:35:03Z

FLINK-7422 Upgrade Kinesis Client Library (KCL) and AWS SDK in 
flink-connector-kinesis

commit 381cd4156b84673a1d32d2db3f7b2d748d90d980
Author: Bowen Li 
Date:   2017-09-07T06:33:37Z

Merge remote-tracking branch 'upstream/master'

commit dcf40bd821187b848d924f7f4df6805b1b924c16
Author: Bowen Li 
Date:   2017-09-15T18:00:03Z

Merge remote-tracking branch 'upstream/master'

commit 169ea0a3bee1ba315d39fa49c16e9bd7c71d1338
Author: Bowen Li 
Date:   2017-09-18T06:25:26Z

Merge remote-tracking branch 'upstream/master'

commit 659e91c18ade8eb65d355b5b85ae2d402a61ff5e
Author: Bowen Li 
Date:   2017-09-18T23:50:48Z

Merge remote-tracking branch 'upstream/master'

commit 990c4648a1427ca7c3c27453fe2a40cd5cac3734
Author: Bowen Li 
Date:   2017-09-19T17:18:54Z

Merge remote-tracking branch 'upstream/master'

commit fb88d61579cbc602b21f96ffca5c189aa846bec1
Author: Bowen Li 
Date:   2017-09-20T05:56:14Z

Merge branch 'master' of github.com:bowenli86/flink

commit 3538421e6e8d194fe838f662480f44b57a41e5e2
Author: Bowen Li 
Date:   2017-09-20T06:35:34Z

add full path of WindowWordCount to doc to remove warning from IDE

commit e08f099465f495401efad1a4d8f54a19735df76c
Author: Bowen Li 
Date:   2017-09-20T18:08:06Z

format code and doc

commit 83fca0651f8d342aea83291bfe4d45bc8df646d2
Author: Bowen Li 
Date:   2017-09-21T19:13:17Z

update window doc

commit a90e18102e8f85d305cd5f2f077db202f471023b
Author: Bowen Li 
Date:   2017-09-22T00:56:03Z

FLINK-7635 Support sideOutput in ProcessWindowFunciton

commit 76e22afa3030db9dd555608168f7cf408696d1a7
Author: Bowen Li 
Date:   2017-09-22T01:07:02Z

add output to InternalProcessApplyWindowContext

commit 3da7ce9c439fb58c898f34acc87e11621df68947
Author: Bowen Li 
Date:   2017-09-22T01:57:03Z

add scala API




---


[jira] [Commented] (FLINK-4319) Rework Cluster Management (FLIP-6)

2017-09-21 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175658#comment-16175658
 ] 

Till Rohrmann commented on FLINK-4319:
--

Hi [~suez1224],

thanks for testing out the current state of Flip-6. This is valuable feedback 
which we have to address.

1) It's true that we don't wait on the completion of the {{JobMaster}} shutdown 
in the {{JobManagerRunner}} yet. However, I thought that the 
{{JobClusterEntrypoint}} should call {{System.exit}} upon termination of the 
job (at least eventually). This might point towards a bug.
2) The RM stops TMs after they were idle for a certain time (10s). However, the 
stopping logic is not yet implemented for the {{YarnResourceManager}}. See 
YarnResourceManager.java:231. This should be fairly straight forward. See 
FLINK-7076 for more information.
3) We are currently working on porting the web dashboard to the new 
architecture. See FLINK-7530 for the current state.

If you like, then you could take over FLINK-7076 since we didn't make much 
progress there. Also FLINK-7530 could benefit from helping hands :-)


> Rework Cluster Management (FLIP-6)
> --
>
> Key: FLINK-4319
> URL: https://issues.apache.org/jira/browse/FLINK-4319
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Affects Versions: 1.1.0
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> This is the root issue to track progress of the rework of cluster management 
> (FLIP-6) 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling

2017-09-21 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175656#comment-16175656
 ] 

Till Rohrmann commented on FLINK-7076:
--

Hi [~yuemeng], are you still working on this?

> Implement container release to support dynamic scaling
> --
>
> Key: FLINK-7076
> URL: https://issues.apache.org/jira/browse/FLINK-7076
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: yuemeng
>  Labels: flip-6
>
> In order to support dynamic scaling, the {{YarnResourceManager}} has to be 
> able to dynamically free containers. We have to implement the 
> {{YarnResourceManager#stopWorker}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7647) Port JobManagerConfigHandler to new REST endpoint

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175640#comment-16175640
 ] 

ASF GitHub Bot commented on FLINK-7647:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4691#discussion_r140379775
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 ---
@@ -55,16 +59,19 @@
 public class DispatcherRestEndpoint extends RestServerEndpoint {
 
private final GatewayRetriever leaderRetriever;
+   private final Configuration clusterConfiguration;
private final RestHandlerConfiguration restConfiguration;
private final Executor executor;
 
public DispatcherRestEndpoint(
-   RestServerEndpointConfiguration configuration,
+   Configuration clusterConfiguration,
+   RestServerEndpointConfiguration endpointConfiguration,
--- End diff --

Shall we remove the order between `clusterConfiguration` and 
`endpointConfiguration`? I usually like to have the super class arguments first.


> Port JobManagerConfigHandler to new REST endpoint
> -
>
> Key: FLINK-7647
> URL: https://issues.apache.org/jira/browse/FLINK-7647
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobManagerConfigHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4691: [FLINK-7647] [flip6] Port JobManagerConfigHandler ...

2017-09-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4691#discussion_r140380156
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java
 ---
@@ -33,20 +43,27 @@
 /**
  * Returns the Job Manager's configuration.
  */
-public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
-
-   private static final String JOBMANAGER_CONFIG_REST_PATH = 
"/jobmanager/config";
+public class ClusterConfigHandler extends AbstractJsonRequestHandler
+   implements LegacyRestHandler {
 
private final Configuration config;
 
-   public JobManagerConfigHandler(Executor executor, Configuration config) 
{
+   public ClusterConfigHandler(Executor executor, Configuration config) {
super(executor);
-   this.config = config;
+   this.config = Preconditions.checkNotNull(config);
}
 
@Override
public String[] getPaths() {
-   return new String[]{JOBMANAGER_CONFIG_REST_PATH};
+   return new 
String[]{ClusterConfigurationHeaders.CLUSTER_CONFIG_REST_PATH};
+   }
+
+   @Override
+   public CompletableFuture handleRequest(
+   HandlerRequest request,
+   DispatcherGateway gateway) {
+
+   return CompletableFuture.supplyAsync(() -> 
ClusterConfiguration.from(config), executor);
--- End diff --

Maybe we could generate the `ClusterConfiguration` instance once at 
creation time and then always return this element as a 
`CompletableFuture.completedFuture(clusterConfiguration)`.


---


[jira] [Commented] (FLINK-7647) Port JobManagerConfigHandler to new REST endpoint

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175639#comment-16175639
 ] 

ASF GitHub Bot commented on FLINK-7647:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4691#discussion_r140381187
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import java.util.ArrayList;
+
+/**
+ * Response of the {@link ClusterConfigHandler}, respresented as a list
+ * of key-value pairs of the cluster {@link Configuration}.
+ */
+public class ClusterConfiguration extends 
ArrayList implements ResponseBody {
--- End diff --

There is already another class called `ClusterConfiguration`. Maybe we 
should rename it in order to disambiguate it.


> Port JobManagerConfigHandler to new REST endpoint
> -
>
> Key: FLINK-7647
> URL: https://issues.apache.org/jira/browse/FLINK-7647
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobManagerConfigHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7647) Port JobManagerConfigHandler to new REST endpoint

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175638#comment-16175638
 ] 

ASF GitHub Bot commented on FLINK-7647:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4691#discussion_r140380156
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterConfigHandler.java
 ---
@@ -33,20 +43,27 @@
 /**
  * Returns the Job Manager's configuration.
  */
-public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
-
-   private static final String JOBMANAGER_CONFIG_REST_PATH = 
"/jobmanager/config";
+public class ClusterConfigHandler extends AbstractJsonRequestHandler
+   implements LegacyRestHandler {
 
private final Configuration config;
 
-   public JobManagerConfigHandler(Executor executor, Configuration config) 
{
+   public ClusterConfigHandler(Executor executor, Configuration config) {
super(executor);
-   this.config = config;
+   this.config = Preconditions.checkNotNull(config);
}
 
@Override
public String[] getPaths() {
-   return new String[]{JOBMANAGER_CONFIG_REST_PATH};
+   return new 
String[]{ClusterConfigurationHeaders.CLUSTER_CONFIG_REST_PATH};
+   }
+
+   @Override
+   public CompletableFuture handleRequest(
+   HandlerRequest request,
+   DispatcherGateway gateway) {
+
+   return CompletableFuture.supplyAsync(() -> 
ClusterConfiguration.from(config), executor);
--- End diff --

Maybe we could generate the `ClusterConfiguration` instance once at 
creation time and then always return this element as a 
`CompletableFuture.completedFuture(clusterConfiguration)`.


> Port JobManagerConfigHandler to new REST endpoint
> -
>
> Key: FLINK-7647
> URL: https://issues.apache.org/jira/browse/FLINK-7647
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobManagerConfigHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4691: [FLINK-7647] [flip6] Port JobManagerConfigHandler ...

2017-09-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4691#discussion_r140381187
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import java.util.ArrayList;
+
+/**
+ * Response of the {@link ClusterConfigHandler}, respresented as a list
+ * of key-value pairs of the cluster {@link Configuration}.
+ */
+public class ClusterConfiguration extends 
ArrayList implements ResponseBody {
--- End diff --

There is already another class called `ClusterConfiguration`. Maybe we 
should rename it in order to disambiguate it.


---


[jira] [Commented] (FLINK-7647) Port JobManagerConfigHandler to new REST endpoint

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175637#comment-16175637
 ] 

ASF GitHub Bot commented on FLINK-7647:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4691#discussion_r140382392
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import java.util.ArrayList;
+
+/**
+ * Response of the {@link ClusterConfigHandler}, respresented as a list
+ * of key-value pairs of the cluster {@link Configuration}.
+ */
+public class ClusterConfiguration extends 
ArrayList implements ResponseBody {
--- End diff --

I'm actually wondering why we don't extend from a `Map` implementation 
since it should behave more like a map. This of course would require changes on 
the web gui side. I think now, the JSON would like

```
[{"key":"keyvalue", "value":"valuevalue"}, {}] 
```
instead of
```
{"keyvalue":"valuevalue", }
```

Maybe we could change this in the future.


> Port JobManagerConfigHandler to new REST endpoint
> -
>
> Key: FLINK-7647
> URL: https://issues.apache.org/jira/browse/FLINK-7647
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobManagerConfigHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7647) Port JobManagerConfigHandler to new REST endpoint

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175641#comment-16175641
 ] 

ASF GitHub Bot commented on FLINK-7647:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4691#discussion_r140381079
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import java.util.ArrayList;
+
+/**
+ * Response of the {@link ClusterConfigHandler}, respresented as a list
+ * of key-value pairs of the cluster {@link Configuration}.
+ */
+public class ClusterConfiguration extends 
ArrayList implements ResponseBody {
--- End diff --

Serial version uid missing.


> Port JobManagerConfigHandler to new REST endpoint
> -
>
> Key: FLINK-7647
> URL: https://issues.apache.org/jira/browse/FLINK-7647
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobManagerConfigHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4691: [FLINK-7647] [flip6] Port JobManagerConfigHandler ...

2017-09-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4691#discussion_r140382392
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import java.util.ArrayList;
+
+/**
+ * Response of the {@link ClusterConfigHandler}, respresented as a list
+ * of key-value pairs of the cluster {@link Configuration}.
+ */
+public class ClusterConfiguration extends 
ArrayList implements ResponseBody {
--- End diff --

I'm actually wondering why we don't extend from a `Map` implementation 
since it should behave more like a map. This of course would require changes on 
the web gui side. I think now, the JSON would like

```
[{"key":"keyvalue", "value":"valuevalue"}, {}] 
```
instead of
```
{"keyvalue":"valuevalue", }
```

Maybe we could change this in the future.


---


[GitHub] flink pull request #4691: [FLINK-7647] [flip6] Port JobManagerConfigHandler ...

2017-09-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4691#discussion_r140381079
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfiguration.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import java.util.ArrayList;
+
+/**
+ * Response of the {@link ClusterConfigHandler}, respresented as a list
+ * of key-value pairs of the cluster {@link Configuration}.
+ */
+public class ClusterConfiguration extends 
ArrayList implements ResponseBody {
--- End diff --

Serial version uid missing.


---


[GitHub] flink pull request #4691: [FLINK-7647] [flip6] Port JobManagerConfigHandler ...

2017-09-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4691#discussion_r140379775
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 ---
@@ -55,16 +59,19 @@
 public class DispatcherRestEndpoint extends RestServerEndpoint {
 
private final GatewayRetriever leaderRetriever;
+   private final Configuration clusterConfiguration;
private final RestHandlerConfiguration restConfiguration;
private final Executor executor;
 
public DispatcherRestEndpoint(
-   RestServerEndpointConfiguration configuration,
+   Configuration clusterConfiguration,
+   RestServerEndpointConfiguration endpointConfiguration,
--- End diff --

Shall we remove the order between `clusterConfiguration` and 
`endpointConfiguration`? I usually like to have the super class arguments first.


---


[GitHub] flink pull request #4689: [FLINK-7655] [flip6] Set fencing token to null if ...

2017-09-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4689


---


[jira] [Commented] (FLINK-7655) Revisit default non-leader id for FencedRpcEndpoints

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175614#comment-16175614
 ] 

ASF GitHub Bot commented on FLINK-7655:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4689


> Revisit default non-leader id for FencedRpcEndpoints
> 
>
> Key: FLINK-7655
> URL: https://issues.apache.org/jira/browse/FLINK-7655
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Currently, when a {{FencedRpcEndpoint}} loses leadership, we set its leader 
> id to a random value. This can be problematic, even though it's unlikely, 
> because we might set it to a value which is used somewhere else (e.g. the 
> currently valid leader id). I think it would  be better to simply set the 
> leader id to {{null}} in order to properly encode that the 
> {{FencedRpcEndpoint}} is no longer a leader.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7655) Revisit default non-leader id for FencedRpcEndpoints

2017-09-21 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7655.

   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed via 42cc3a2a9c41dda7cf338db36b45131db9150674

> Revisit default non-leader id for FencedRpcEndpoints
> 
>
> Key: FLINK-7655
> URL: https://issues.apache.org/jira/browse/FLINK-7655
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Currently, when a {{FencedRpcEndpoint}} loses leadership, we set its leader 
> id to a random value. This can be problematic, even though it's unlikely, 
> because we might set it to a value which is used somewhere else (e.g. the 
> currently valid leader id). I think it would  be better to simply set the 
> leader id to {{null}} in order to properly encode that the 
> {{FencedRpcEndpoint}} is no longer a leader.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7655) Revisit default non-leader id for FencedRpcEndpoints

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175609#comment-16175609
 ] 

ASF GitHub Bot commented on FLINK-7655:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4689
  
Travis passed. Merging this PR.


> Revisit default non-leader id for FencedRpcEndpoints
> 
>
> Key: FLINK-7655
> URL: https://issues.apache.org/jira/browse/FLINK-7655
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Currently, when a {{FencedRpcEndpoint}} loses leadership, we set its leader 
> id to a random value. This can be problematic, even though it's unlikely, 
> because we might set it to a value which is used somewhere else (e.g. the 
> currently valid leader id). I think it would  be better to simply set the 
> leader id to {{null}} in order to properly encode that the 
> {{FencedRpcEndpoint}} is no longer a leader.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4689: [FLINK-7655] [flip6] Set fencing token to null if not lea...

2017-09-21 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4689
  
Travis passed. Merging this PR.


---


[jira] [Commented] (FLINK-4319) Rework Cluster Management (FLIP-6)

2017-09-21 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175494#comment-16175494
 ] 

Shuyi Chen commented on FLINK-4319:
---

Hi [~till.rohrmann], thanks a lot for the pointers.
I was trying out the current status of Flip-6 flink-on-yarn. I found several 
problems:
1) JobMaster wont get killed after the job is finished. It seems the JobMaster 
never received the SHUTDOWN akka message. Maybe it's related to 
https://issues.apache.org/jira/browse/FLINK-5176?
2) The TaskManager wont get released after the job died. Maybe this is related 
to 1).
3) The web dashboard is no longer accessible. 

Can you shed some lights on these issues? I would be happy to help if needed. 
Thanks.

> Rework Cluster Management (FLIP-6)
> --
>
> Key: FLINK-4319
> URL: https://issues.apache.org/jira/browse/FLINK-4319
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Affects Versions: 1.1.0
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> This is the root issue to track progress of the rework of cluster management 
> (FLIP-6) 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7666) ContinuousFileReaderOperator swallows chained watermarks

2017-09-21 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-7666:
--

 Summary: ContinuousFileReaderOperator swallows chained watermarks
 Key: FLINK-7666
 URL: https://issues.apache.org/jira/browse/FLINK-7666
 Project: Flink
  Issue Type: Improvement
  Components: Streaming Connectors
Affects Versions: 1.3.2
Reporter: Ufuk Celebi


I use event time and read from a (finite) file. I assign watermarks right after 
the {{ContinuousFileReaderOperator}} with parallelism 1.

{code}
env
  .readFile(new TextInputFormat(...), ...)
  .setParallelism(1)
  .assignTimestampsAndWatermarks(...)
  .setParallelism(1)
  .map()...
{code}

The watermarks I assign never progress through the pipeline.

I can work around this by inserting a {{shuffle()}} after the file reader or 
starting a new chain at the assigner:
{code}
env
  .readFile(new TextInputFormat(...), ...)
  .setParallelism(1)
  .shuffle() 
  .assignTimestampsAndWatermarks(...)
  .setParallelism(1)
  .map()...
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7665) Use wait/notify in ContinuousFileReaderOperator

2017-09-21 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-7665:
--

 Summary: Use wait/notify in ContinuousFileReaderOperator
 Key: FLINK-7665
 URL: https://issues.apache.org/jira/browse/FLINK-7665
 Project: Flink
  Issue Type: Improvement
  Components: Streaming Connectors
Affects Versions: 1.4.0
Reporter: Ufuk Celebi
Priority: Minor


{{ContinuousFileReaderOperator}} has the following loop to receive input splits:
{code}
synchronized (checkpointLock) {
  if (currentSplit == null) {
currentSplit = this.pendingSplits.poll();
if (currentSplit == null) {
  if (this.shouldClose) {
isRunning = false;
  } else {
checkpointLock.wait(50);
  }
  continue;
}
  }
}
{code}

I think we can replace this with a {{wait()}} and {{notify()}} in {{addSplit}} 
and {{close}}.

If there is a reason to keep the {{wait(50)}}, feel free to close this issue. 
:-)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7664) Replace FlinkFutureException by CompletionException

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175144#comment-16175144
 ] 

ASF GitHub Bot commented on FLINK-7664:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4701

[FLINK-7664] Replace FlinkFutureException by 
java.util.concurrent.CompletionException

## What is the purpose of the change

FlinkFutureException was introduced to fail a CompletableFuture callback. 
However, there
was already such a class which allows to better handle failures in 
different stages which
is the java.util.CompletionException. Therefore we replace 
FlinkFutureException by
CompletionException and remove the former.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink removeFlinkFutureException

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4701.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4701


commit 3b94ca218e6ed4c8fc0fe491d33c072473651a9b
Author: Till Rohrmann 
Date:   2017-09-21T17:21:07Z

[FLINK-7664] Replace FlinkFutureException by 
java.util.concurrent.CompletionException

FlinkFutureException was introduced to fail a CompletableFuture callback. 
However, there
was already such a class which allows to better handle failures in 
different stages which
is the java.util.CompletionException. Therefore we replace 
FlinkFutureException by
CompletionException and remove the former.




> Replace FlinkFutureException by CompletionException
> ---
>
> Key: FLINK-7664
> URL: https://issues.apache.org/jira/browse/FLINK-7664
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{FlinkFutureException}} was introduced to fail in a 
> {{CompletableFuture}} callback. This can, however, better be done via the 
> {{CompletionException}}. Therefore, we should remove the 
> {{FlinkFutureException}} and replace it with the {{CompletionException}} 
> instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4701: [FLINK-7664] Replace FlinkFutureException by java....

2017-09-21 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4701

[FLINK-7664] Replace FlinkFutureException by 
java.util.concurrent.CompletionException

## What is the purpose of the change

FlinkFutureException was introduced to fail a CompletableFuture callback. 
However, there
was already such a class which allows to better handle failures in 
different stages which
is the java.util.CompletionException. Therefore we replace 
FlinkFutureException by
CompletionException and remove the former.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink removeFlinkFutureException

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4701.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4701


commit 3b94ca218e6ed4c8fc0fe491d33c072473651a9b
Author: Till Rohrmann 
Date:   2017-09-21T17:21:07Z

[FLINK-7664] Replace FlinkFutureException by 
java.util.concurrent.CompletionException

FlinkFutureException was introduced to fail a CompletableFuture callback. 
However, there
was already such a class which allows to better handle failures in 
different stages which
is the java.util.CompletionException. Therefore we replace 
FlinkFutureException by
CompletionException and remove the former.




---


[jira] [Created] (FLINK-7664) Replace FlinkFutureException by CompletionException

2017-09-21 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7664:


 Summary: Replace FlinkFutureException by CompletionException
 Key: FLINK-7664
 URL: https://issues.apache.org/jira/browse/FLINK-7664
 Project: Flink
  Issue Type: Improvement
  Components: Core
Affects Versions: 1.4.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann


The {{FlinkFutureException}} was introduced to fail in a {{CompletableFuture}} 
callback. This can, however, better be done via the {{CompletionException}}. 
Therefore, we should remove the {{FlinkFutureException}} and replace it with 
the {{CompletionException}} instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175091#comment-16175091
 ] 

ASF GitHub Bot commented on FLINK-7636:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4681#discussion_r140292203
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.prepare.CalcitePrepareImpl
+import java.util.{List => JList}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.prepare.Prepare.PreparingTable
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema.{StreamableTable, TranslatableTable}
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import scala.collection.JavaConverters._
+
+/**
+  * FlinkRelOptTable wraps a FlinkTable
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+
+class FlinkRelOptTable protected(
+schema: RelOptSchema,
+rowType: RelDataType,
+names: JList[String],
+table: FlinkTable[_]) extends PreparingTable {
+
+  /**
+* Creates a copy of this Flink RelOptTable with new Flink Table and 
new row type.
+*
+* @param newTablenew flink table
+* @param typeFactory type factory to create new row type of new flink 
table
+* @return The copy of this Flink RelOptTable with new Flink table and 
new row type
+*/
+  def copy(newTable: FlinkTable[_], typeFactory: RelDataTypeFactory): 
FlinkRelOptTable = {
+val newRowType = newTable.getRowType(typeFactory)
+FlinkRelOptTable.create(schema, newRowType, names, newTable)
+  }
+
+  /**
+* Extends a table with the given extra fields, which is not supported 
now.
+*
+* @param extendedFields
+* @return
+*/
+  override def extend(extendedFields: JList[RelDataTypeField]): 
RelOptTable =
+throw new UnsupportedOperationException
+
+  /**
+* Obtains an identifier for this table.
+*
+* @return qualified name
+*/
+  override def getQualifiedName: JList[String] = names
+
+
+  /**
+* Obtains the access type of the table.
+*
+* @return all access types including SELECT/UPDATE/INSERT/DELETE
+*/
+  override def getAllowedAccess: SqlAccessType = SqlAccessType.ALL
--- End diff --

I think we should make this depending on the type of `table`. 
With [FLINK-6442](https://issues.apache.org/jira/browse/FLINK-6442) we 
added support to register `TableSink` and `INSERT INTO sinkTable SELECT`. 

So if `table` is backed by a `DataSet`, `DataStream`, or `TableSource` is 
supports `SELECT`. If `table` is backed by a `TableSink` it supports `INSE

[jira] [Commented] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175093#comment-16175093
 ] 

ASF GitHub Bot commented on FLINK-7636:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4681#discussion_r140290245
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.prepare.CalcitePrepareImpl
+import java.util.{List => JList}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.prepare.Prepare.PreparingTable
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema.{StreamableTable, TranslatableTable}
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import scala.collection.JavaConverters._
+
+/**
+  * FlinkRelOptTable wraps a FlinkTable
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+
+class FlinkRelOptTable protected(
+schema: RelOptSchema,
+rowType: RelDataType,
+names: JList[String],
--- End diff --

rename parameter to `qualifiedName`


> Introduce Flink RelOptTable,  and remove tableSource from all TableSourceScan 
> node constructor
> --
>
> Key: FLINK-7636
> URL: https://issues.apache.org/jira/browse/FLINK-7636
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> At present, there are two ways to fetch TableSource of a TableSourceScan node 
> (e.g LogicalTableSourceScan, PhysicalTableSourceScan ...):
> 1. 
> {code}
> val relOptTable: RelOptTable = getTable()
> val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
> val tableSouce = tableSourceTable.tableSource
> {code}
> the result of getTable() is instance of RelOptTableImpl now, and it will not 
> change after RelNode tree is built.
> 2. now all TableSourceScan contains a tablesource as constructor parameter, 
> so we could fetch the tablesource directly later.
>  
> The result tableSource is different with each other by above two ways after 
> apply project push(PPD) down or filter push down(FPD).  It is very confusing. 
> we hope to fix the problem by introducing FlinkRelOptTable to replace 
> RelOptTableImpl, and remove tableSource parameter from TableSourceScan's 
> constructor. After PPD or FPD,  a new FlinkRelOptTable instance which 
> contains a new TableSourceTable will be passed to TableSourceScan 
> constructor. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175090#comment-16175090
 ] 

ASF GitHub Bot commented on FLINK-7636:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4681#discussion_r140291045
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.prepare.CalcitePrepareImpl
+import java.util.{List => JList}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.prepare.Prepare.PreparingTable
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema.{StreamableTable, TranslatableTable}
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import scala.collection.JavaConverters._
+
+/**
+  * FlinkRelOptTable wraps a FlinkTable
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+
+class FlinkRelOptTable protected(
+schema: RelOptSchema,
+rowType: RelDataType,
+names: JList[String],
+table: FlinkTable[_]) extends PreparingTable {
+
+  /**
+* Creates a copy of this Flink RelOptTable with new Flink Table and 
new row type.
+*
+* @param newTablenew flink table
+* @param typeFactory type factory to create new row type of new flink 
table
+* @return The copy of this Flink RelOptTable with new Flink table and 
new row type
+*/
+  def copy(newTable: FlinkTable[_], typeFactory: RelDataTypeFactory): 
FlinkRelOptTable = {
+val newRowType = newTable.getRowType(typeFactory)
+FlinkRelOptTable.create(schema, newRowType, names, newTable)
+  }
+
+  /**
+* Extends a table with the given extra fields, which is not supported 
now.
+*
+* @param extendedFields
--- End diff --

Remove is no parameter description is provided


> Introduce Flink RelOptTable,  and remove tableSource from all TableSourceScan 
> node constructor
> --
>
> Key: FLINK-7636
> URL: https://issues.apache.org/jira/browse/FLINK-7636
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> At present, there are two ways to fetch TableSource of a TableSourceScan node 
> (e.g LogicalTableSourceScan, PhysicalTableSourceScan ...):
> 1. 
> {code}
> val relOptTable: RelOptTable = getTable()
> val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
> val tableSouce = tableSourceTable.tableSource
> {code}
> the result of getTable() is instance of RelOptTableImpl now, and it will not 
> change after RelNode 

[jira] [Commented] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175092#comment-16175092
 ] 

ASF GitHub Bot commented on FLINK-7636:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4681#discussion_r140297123
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.prepare.CalcitePrepareImpl
+import java.util.{List => JList}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.prepare.Prepare.PreparingTable
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema.{StreamableTable, TranslatableTable}
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import scala.collection.JavaConverters._
+
+/**
+  * FlinkRelOptTable wraps a FlinkTable
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+
+class FlinkRelOptTable protected(
+schema: RelOptSchema,
+rowType: RelDataType,
+names: JList[String],
+table: FlinkTable[_]) extends PreparingTable {
--- End diff --

It looks as if most of the code was copied from `RelOptTableImpl`. 
I think a few methods can be simplified because some of the code does not 
apply to Flink and how we use Calcite. 


> Introduce Flink RelOptTable,  and remove tableSource from all TableSourceScan 
> node constructor
> --
>
> Key: FLINK-7636
> URL: https://issues.apache.org/jira/browse/FLINK-7636
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> At present, there are two ways to fetch TableSource of a TableSourceScan node 
> (e.g LogicalTableSourceScan, PhysicalTableSourceScan ...):
> 1. 
> {code}
> val relOptTable: RelOptTable = getTable()
> val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
> val tableSouce = tableSourceTable.tableSource
> {code}
> the result of getTable() is instance of RelOptTableImpl now, and it will not 
> change after RelNode tree is built.
> 2. now all TableSourceScan contains a tablesource as constructor parameter, 
> so we could fetch the tablesource directly later.
>  
> The result tableSource is different with each other by above two ways after 
> apply project push(PPD) down or filter push down(FPD).  It is very confusing. 
> we hope to fix the problem by introducing FlinkRelOptTable to replace 
> RelOptTableImpl, and remove tableSource parameter from TableSourceScan's 
> constructor. After PPD or FPD,  a new FlinkRelOptTable instance which 
> contains a new TableSourceTable will be passed to TableSourceSca

[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...

2017-09-21 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4681#discussion_r140297123
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.prepare.CalcitePrepareImpl
+import java.util.{List => JList}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.prepare.Prepare.PreparingTable
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema.{StreamableTable, TranslatableTable}
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import scala.collection.JavaConverters._
+
+/**
+  * FlinkRelOptTable wraps a FlinkTable
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+
+class FlinkRelOptTable protected(
+schema: RelOptSchema,
+rowType: RelDataType,
+names: JList[String],
+table: FlinkTable[_]) extends PreparingTable {
--- End diff --

It looks as if most of the code was copied from `RelOptTableImpl`. 
I think a few methods can be simplified because some of the code does not 
apply to Flink and how we use Calcite. 


---


[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...

2017-09-21 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4681#discussion_r140291045
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.prepare.CalcitePrepareImpl
+import java.util.{List => JList}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.prepare.Prepare.PreparingTable
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema.{StreamableTable, TranslatableTable}
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import scala.collection.JavaConverters._
+
+/**
+  * FlinkRelOptTable wraps a FlinkTable
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+
+class FlinkRelOptTable protected(
+schema: RelOptSchema,
+rowType: RelDataType,
+names: JList[String],
+table: FlinkTable[_]) extends PreparingTable {
+
+  /**
+* Creates a copy of this Flink RelOptTable with new Flink Table and 
new row type.
+*
+* @param newTablenew flink table
+* @param typeFactory type factory to create new row type of new flink 
table
+* @return The copy of this Flink RelOptTable with new Flink table and 
new row type
+*/
+  def copy(newTable: FlinkTable[_], typeFactory: RelDataTypeFactory): 
FlinkRelOptTable = {
+val newRowType = newTable.getRowType(typeFactory)
+FlinkRelOptTable.create(schema, newRowType, names, newTable)
+  }
+
+  /**
+* Extends a table with the given extra fields, which is not supported 
now.
+*
+* @param extendedFields
--- End diff --

Remove is no parameter description is provided


---


[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...

2017-09-21 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4681#discussion_r140292203
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.prepare.CalcitePrepareImpl
+import java.util.{List => JList}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.prepare.Prepare.PreparingTable
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema.{StreamableTable, TranslatableTable}
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import scala.collection.JavaConverters._
+
+/**
+  * FlinkRelOptTable wraps a FlinkTable
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+
+class FlinkRelOptTable protected(
+schema: RelOptSchema,
+rowType: RelDataType,
+names: JList[String],
+table: FlinkTable[_]) extends PreparingTable {
+
+  /**
+* Creates a copy of this Flink RelOptTable with new Flink Table and 
new row type.
+*
+* @param newTablenew flink table
+* @param typeFactory type factory to create new row type of new flink 
table
+* @return The copy of this Flink RelOptTable with new Flink table and 
new row type
+*/
+  def copy(newTable: FlinkTable[_], typeFactory: RelDataTypeFactory): 
FlinkRelOptTable = {
+val newRowType = newTable.getRowType(typeFactory)
+FlinkRelOptTable.create(schema, newRowType, names, newTable)
+  }
+
+  /**
+* Extends a table with the given extra fields, which is not supported 
now.
+*
+* @param extendedFields
+* @return
+*/
+  override def extend(extendedFields: JList[RelDataTypeField]): 
RelOptTable =
+throw new UnsupportedOperationException
+
+  /**
+* Obtains an identifier for this table.
+*
+* @return qualified name
+*/
+  override def getQualifiedName: JList[String] = names
+
+
+  /**
+* Obtains the access type of the table.
+*
+* @return all access types including SELECT/UPDATE/INSERT/DELETE
+*/
+  override def getAllowedAccess: SqlAccessType = SqlAccessType.ALL
--- End diff --

I think we should make this depending on the type of `table`. 
With [FLINK-6442](https://issues.apache.org/jira/browse/FLINK-6442) we 
added support to register `TableSink` and `INSERT INTO sinkTable SELECT`. 

So if `table` is backed by a `DataSet`, `DataStream`, or `TableSource` is 
supports `SELECT`. If `table` is backed by a `TableSink` it supports `INSERT` 
(`DELETE` and `UPDATE` might be added later).


---


[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...

2017-09-21 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4681#discussion_r140290245
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.prepare.CalcitePrepareImpl
+import java.util.{List => JList}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.prepare.Prepare.PreparingTable
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema.{StreamableTable, TranslatableTable}
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import scala.collection.JavaConverters._
+
+/**
+  * FlinkRelOptTable wraps a FlinkTable
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+
+class FlinkRelOptTable protected(
+schema: RelOptSchema,
+rowType: RelDataType,
+names: JList[String],
--- End diff --

rename parameter to `qualifiedName`


---


[jira] [Commented] (FLINK-7649) Port JobStoppingHandler to new REST endpoint

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175080#comment-16175080
 ] 

ASF GitHub Bot commented on FLINK-7649:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4700

[FLINK-7649] [flip6] Extend JobTerminationHandler to support stop

## What is the purpose of the change

Rename the JobCancellationHandler into JobTerminationHandler which is now 
responsible
for terminating jobs. Moreover, this commits adds two termination modes, 
cancel and stop,
which are specified by providing a query parameter.

## Brief change log

- Rename `JobCancellationHandler` into `JobTerminationHandler` being now 
responsible for the job termination (cancelling and stopping so far)
- Introduce a `TerminationModeQueryParameter` which allows to specify the 
termination mode (`stop` and `cancel`)
- Add `stopJob` to `DispatcherGateway` and `stop` to `JobMasterGateway`
- Adapt `JobTerminationHandler` to either call `cancelJob` or `stopJob` on 
the `Dispatcher` depending on the mode (default is `cancel`)

## Verifying this change

Tested manual that the right termination call is executed.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink portJobStoppingHandler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4700.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4700


commit afa456f68a865ca5cf4aa1f60c2f1d43ff471e1b
Author: Till Rohrmann 
Date:   2017-09-21T08:53:24Z

[FLINK-7650] [flip6] Port JobCancellationHandler to new REST endpoint

Let the JobCancellationHandler implement the LegacyRestHandler interface. 
Moreover,
this commit adds the DELETE method to HttpMethodWrapper and the
RestServerEndpoint#registerHandler method.

commit a8f695607a1aa48d2af9175c47ed26dd965d9c05
Author: Till Rohrmann 
Date:   2017-09-21T14:47:18Z

[FLINK-7649] [flip6] Extend JobTerminationHandler to support stop

Rename the JobCancellationHandler into JobTerminationHandler which is now 
responsible
for terminating jobs. Moreover, this commits adds two termination modes, 
cancel and stop,
which are specified by providing a query parameter.




> Port JobStoppingHandler to new REST endpoint
> 
>
> Key: FLINK-7649
> URL: https://issues.apache.org/jira/browse/FLINK-7649
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobStoppingHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4700: [FLINK-7649] [flip6] Extend JobTerminationHandler ...

2017-09-21 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4700

[FLINK-7649] [flip6] Extend JobTerminationHandler to support stop

## What is the purpose of the change

Rename the JobCancellationHandler into JobTerminationHandler which is now 
responsible
for terminating jobs. Moreover, this commits adds two termination modes, 
cancel and stop,
which are specified by providing a query parameter.

## Brief change log

- Rename `JobCancellationHandler` into `JobTerminationHandler` being now 
responsible for the job termination (cancelling and stopping so far)
- Introduce a `TerminationModeQueryParameter` which allows to specify the 
termination mode (`stop` and `cancel`)
- Add `stopJob` to `DispatcherGateway` and `stop` to `JobMasterGateway`
- Adapt `JobTerminationHandler` to either call `cancelJob` or `stopJob` on 
the `Dispatcher` depending on the mode (default is `cancel`)

## Verifying this change

Tested manual that the right termination call is executed.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink portJobStoppingHandler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4700.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4700


commit afa456f68a865ca5cf4aa1f60c2f1d43ff471e1b
Author: Till Rohrmann 
Date:   2017-09-21T08:53:24Z

[FLINK-7650] [flip6] Port JobCancellationHandler to new REST endpoint

Let the JobCancellationHandler implement the LegacyRestHandler interface. 
Moreover,
this commit adds the DELETE method to HttpMethodWrapper and the
RestServerEndpoint#registerHandler method.

commit a8f695607a1aa48d2af9175c47ed26dd965d9c05
Author: Till Rohrmann 
Date:   2017-09-21T14:47:18Z

[FLINK-7649] [flip6] Extend JobTerminationHandler to support stop

Rename the JobCancellationHandler into JobTerminationHandler which is now 
responsible
for terminating jobs. Moreover, this commits adds two termination modes, 
cancel and stop,
which are specified by providing a query parameter.




---


[GitHub] flink pull request #4699: [FLINK-7663] [flip6] Return BAD_REQUEST if Handler...

2017-09-21 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4699

[FLINK-7663] [flip6] Return BAD_REQUEST if HandlerRequest cannot be created

## What is the purpose of the change

This commit changes the behaviour such that a failure in creating a 
`HandlerRequest` will
result in a BAD_REQUEST response by the AbstractRestHandler.

## Brief change log

- Introduce the `HandlerRequestException` indicating that the 
`HandlerRequest` could not be created
- Introduce a checked `ConversionException` for 
`MessageParameter#convertFromString` to let conversions fail explicitly
- Send `BAD_REQUEST` response in case that the `HandlerRequest` could not 
be created

## Verifying this change

- Added `RestEndpointITCase#testBadHandlerRequest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

R @zentol 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink supportBadRestRequests

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4699.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4699


commit 38c8a188ad72a1ce48129077b71df971ade44354
Author: Till Rohrmann 
Date:   2017-09-21T16:14:45Z

[FLINK-7663] [flip6] Return BAD_REQUEST if HandlerRequest cannot be created

This commit changes the behaviour such that a failure in creating a 
HandlerRequest will
result in a BAD_REQUEST response by the AbstractRestHandler.




---


[jira] [Commented] (FLINK-7663) Allow AbstractRestHandler to handle bad requests

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175050#comment-16175050
 ] 

ASF GitHub Bot commented on FLINK-7663:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4699

[FLINK-7663] [flip6] Return BAD_REQUEST if HandlerRequest cannot be created

## What is the purpose of the change

This commit changes the behaviour such that a failure in creating a 
`HandlerRequest` will
result in a BAD_REQUEST response by the AbstractRestHandler.

## Brief change log

- Introduce the `HandlerRequestException` indicating that the 
`HandlerRequest` could not be created
- Introduce a checked `ConversionException` for 
`MessageParameter#convertFromString` to let conversions fail explicitly
- Send `BAD_REQUEST` response in case that the `HandlerRequest` could not 
be created

## Verifying this change

- Added `RestEndpointITCase#testBadHandlerRequest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

R @zentol 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink supportBadRestRequests

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4699.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4699


commit 38c8a188ad72a1ce48129077b71df971ade44354
Author: Till Rohrmann 
Date:   2017-09-21T16:14:45Z

[FLINK-7663] [flip6] Return BAD_REQUEST if HandlerRequest cannot be created

This commit changes the behaviour such that a failure in creating a 
HandlerRequest will
result in a BAD_REQUEST response by the AbstractRestHandler.




> Allow AbstractRestHandler to handle bad requests
> 
>
> Key: FLINK-7663
> URL: https://issues.apache.org/jira/browse/FLINK-7663
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> The {{AbstractRestHandler}} parses the request and tries to generate a 
> {{HandlerRequest}}. If this fails, then the server answers with an internal 
> server error. Instead we should allow the {{AbstractRestHandler}} to be able 
> to return a BAD_REQUEST status code. In order to do that, I would like to 
> introduce a {{HandlerRequestException}} which can be thrown while creating 
> the {{HandlerRequest}}. If this exception is thrown, then we return an error 
> message with {{BAD_REQUEST}} {{HttpResponseStatus}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7638) Port CurrentJobsOverviewHandler to new REST endpoint

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175036#comment-16175036
 ] 

ASF GitHub Bot commented on FLINK-7638:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4688#discussion_r140291476
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
 ---
@@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler(
StringWriter writer = new 
StringWriter();
try {
JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+   final 
JobDetails.JobDetailsSerializer jobDetailsSerializer = new 
JobDetails.JobDetailsSerializer();
+
gen.writeStartObject();
 
if (includeRunningJobs && 
includeFinishedJobs) {

gen.writeArrayFieldStart("running");
-   for (JobDetails detail 
: result.getRunningJobs()) {
-   
writeJobDetailOverviewAsJson(detail, gen, now);
+   for (JobDetails detail 
: result.getRunning()) {
+   
jobDetailsSerializer.serialize(detail, gen, null);
}
gen.writeEndArray();
 

gen.writeArrayFieldStart("finished");
--- End diff --

Will change it.


> Port CurrentJobsOverviewHandler to new REST endpoint
> 
>
> Key: FLINK-7638
> URL: https://issues.apache.org/jira/browse/FLINK-7638
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Port the existing {{CurrentJobsOverviewHandler}} to the new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7638) Port CurrentJobsOverviewHandler to new REST endpoint

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175034#comment-16175034
 ] 

ASF GitHub Bot commented on FLINK-7638:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4688#discussion_r140291389
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
 ---
@@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler(
StringWriter writer = new 
StringWriter();
try {
JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+   final 
JobDetails.JobDetailsSerializer jobDetailsSerializer = new 
JobDetails.JobDetailsSerializer();
+
gen.writeStartObject();
 
if (includeRunningJobs && 
includeFinishedJobs) {

gen.writeArrayFieldStart("running");
-   for (JobDetails detail 
: result.getRunningJobs()) {
-   
writeJobDetailOverviewAsJson(detail, gen, now);
+   for (JobDetails detail 
: result.getRunning()) {
+   
jobDetailsSerializer.serialize(detail, gen, null);
}
gen.writeEndArray();
 

gen.writeArrayFieldStart("finished");
-   for (JobDetails detail 
: result.getFinishedJobs()) {
-   
writeJobDetailOverviewAsJson(detail, gen, now);
+   for (JobDetails detail 
: result.getFinished()) {
+   
jobDetailsSerializer.serialize(detail, gen, null);
}
gen.writeEndArray();
} else {
--- End diff --

Yes, because I think it's not so nice to return different answer formats 
depending on how the handler was instantiated. The respective information can 
be easily extracted from the complete overview. Moreover, I think we never used 
these endpoints in the first place.


> Port CurrentJobsOverviewHandler to new REST endpoint
> 
>
> Key: FLINK-7638
> URL: https://issues.apache.org/jira/browse/FLINK-7638
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Port the existing {{CurrentJobsOverviewHandler}} to the new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7638) Port CurrentJobsOverviewHandler to new REST endpoint

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16175035#comment-16175035
 ] 

ASF GitHub Bot commented on FLINK-7638:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4688#discussion_r140291439
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
 ---
@@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler(
StringWriter writer = new 
StringWriter();
try {
JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+   final 
JobDetails.JobDetailsSerializer jobDetailsSerializer = new 
JobDetails.JobDetailsSerializer();
+
gen.writeStartObject();
 
if (includeRunningJobs && 
includeFinishedJobs) {

gen.writeArrayFieldStart("running");
--- End diff --

True, will change it.


> Port CurrentJobsOverviewHandler to new REST endpoint
> 
>
> Key: FLINK-7638
> URL: https://issues.apache.org/jira/browse/FLINK-7638
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Port the existing {{CurrentJobsOverviewHandler}} to the new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4688: [FLINK-7638] [flip6] Port CurrentJobsOverviewHandl...

2017-09-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4688#discussion_r140291439
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
 ---
@@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler(
StringWriter writer = new 
StringWriter();
try {
JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+   final 
JobDetails.JobDetailsSerializer jobDetailsSerializer = new 
JobDetails.JobDetailsSerializer();
+
gen.writeStartObject();
 
if (includeRunningJobs && 
includeFinishedJobs) {

gen.writeArrayFieldStart("running");
--- End diff --

True, will change it.


---


[GitHub] flink pull request #4688: [FLINK-7638] [flip6] Port CurrentJobsOverviewHandl...

2017-09-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4688#discussion_r140291476
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
 ---
@@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler(
StringWriter writer = new 
StringWriter();
try {
JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+   final 
JobDetails.JobDetailsSerializer jobDetailsSerializer = new 
JobDetails.JobDetailsSerializer();
+
gen.writeStartObject();
 
if (includeRunningJobs && 
includeFinishedJobs) {

gen.writeArrayFieldStart("running");
-   for (JobDetails detail 
: result.getRunningJobs()) {
-   
writeJobDetailOverviewAsJson(detail, gen, now);
+   for (JobDetails detail 
: result.getRunning()) {
+   
jobDetailsSerializer.serialize(detail, gen, null);
}
gen.writeEndArray();
 

gen.writeArrayFieldStart("finished");
--- End diff --

Will change it.


---


[GitHub] flink pull request #4688: [FLINK-7638] [flip6] Port CurrentJobsOverviewHandl...

2017-09-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4688#discussion_r140291389
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
 ---
@@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler(
StringWriter writer = new 
StringWriter();
try {
JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+   final 
JobDetails.JobDetailsSerializer jobDetailsSerializer = new 
JobDetails.JobDetailsSerializer();
+
gen.writeStartObject();
 
if (includeRunningJobs && 
includeFinishedJobs) {

gen.writeArrayFieldStart("running");
-   for (JobDetails detail 
: result.getRunningJobs()) {
-   
writeJobDetailOverviewAsJson(detail, gen, now);
+   for (JobDetails detail 
: result.getRunning()) {
+   
jobDetailsSerializer.serialize(detail, gen, null);
}
gen.writeEndArray();
 

gen.writeArrayFieldStart("finished");
-   for (JobDetails detail 
: result.getFinishedJobs()) {
-   
writeJobDetailOverviewAsJson(detail, gen, now);
+   for (JobDetails detail 
: result.getFinished()) {
+   
jobDetailsSerializer.serialize(detail, gen, null);
}
gen.writeEndArray();
} else {
--- End diff --

Yes, because I think it's not so nice to return different answer formats 
depending on how the handler was instantiated. The respective information can 
be easily extracted from the complete overview. Moreover, I think we never used 
these endpoints in the first place.


---


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-09-21 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174974#comment-16174974
 ] 

Aljoscha Krettek commented on FLINK-7623:
-

I think that would be incorrect, because then it would also report "not 
restored" in case the operator had no state assigned but (globally) was still 
restored, i.e. some other parallel instances of this operator might have state.

> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7638) Port CurrentJobsOverviewHandler to new REST endpoint

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174972#comment-16174972
 ] 

ASF GitHub Bot commented on FLINK-7638:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4688#discussion_r140281028
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
 ---
@@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler(
StringWriter writer = new 
StringWriter();
try {
JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+   final 
JobDetails.JobDetailsSerializer jobDetailsSerializer = new 
JobDetails.JobDetailsSerializer();
+
gen.writeStartObject();
 
if (includeRunningJobs && 
includeFinishedJobs) {

gen.writeArrayFieldStart("running");
--- End diff --

"running" --> can use `MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING`


> Port CurrentJobsOverviewHandler to new REST endpoint
> 
>
> Key: FLINK-7638
> URL: https://issues.apache.org/jira/browse/FLINK-7638
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Port the existing {{CurrentJobsOverviewHandler}} to the new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7638) Port CurrentJobsOverviewHandler to new REST endpoint

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174973#comment-16174973
 ] 

ASF GitHub Bot commented on FLINK-7638:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4688#discussion_r140281116
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
 ---
@@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler(
StringWriter writer = new 
StringWriter();
try {
JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+   final 
JobDetails.JobDetailsSerializer jobDetailsSerializer = new 
JobDetails.JobDetailsSerializer();
+
gen.writeStartObject();
 
if (includeRunningJobs && 
includeFinishedJobs) {

gen.writeArrayFieldStart("running");
-   for (JobDetails detail 
: result.getRunningJobs()) {
-   
writeJobDetailOverviewAsJson(detail, gen, now);
+   for (JobDetails detail 
: result.getRunning()) {
+   
jobDetailsSerializer.serialize(detail, gen, null);
}
gen.writeEndArray();
 

gen.writeArrayFieldStart("finished");
--- End diff --

"finished" --> can use `MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED`


> Port CurrentJobsOverviewHandler to new REST endpoint
> 
>
> Key: FLINK-7638
> URL: https://issues.apache.org/jira/browse/FLINK-7638
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Port the existing {{CurrentJobsOverviewHandler}} to the new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4688: [FLINK-7638] [flip6] Port CurrentJobsOverviewHandl...

2017-09-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4688#discussion_r140281116
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
 ---
@@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler(
StringWriter writer = new 
StringWriter();
try {
JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+   final 
JobDetails.JobDetailsSerializer jobDetailsSerializer = new 
JobDetails.JobDetailsSerializer();
+
gen.writeStartObject();
 
if (includeRunningJobs && 
includeFinishedJobs) {

gen.writeArrayFieldStart("running");
-   for (JobDetails detail 
: result.getRunningJobs()) {
-   
writeJobDetailOverviewAsJson(detail, gen, now);
+   for (JobDetails detail 
: result.getRunning()) {
+   
jobDetailsSerializer.serialize(detail, gen, null);
}
gen.writeEndArray();
 

gen.writeArrayFieldStart("finished");
--- End diff --

"finished" --> can use `MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED`


---


[GitHub] flink pull request #4688: [FLINK-7638] [flip6] Port CurrentJobsOverviewHandl...

2017-09-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4688#discussion_r140280832
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
 ---
@@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler(
StringWriter writer = new 
StringWriter();
try {
JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+   final 
JobDetails.JobDetailsSerializer jobDetailsSerializer = new 
JobDetails.JobDetailsSerializer();
+
gen.writeStartObject();
 
if (includeRunningJobs && 
includeFinishedJobs) {

gen.writeArrayFieldStart("running");
-   for (JobDetails detail 
: result.getRunningJobs()) {
-   
writeJobDetailOverviewAsJson(detail, gen, now);
+   for (JobDetails detail 
: result.getRunning()) {
+   
jobDetailsSerializer.serialize(detail, gen, null);
}
gen.writeEndArray();
 

gen.writeArrayFieldStart("finished");
-   for (JobDetails detail 
: result.getFinishedJobs()) {
-   
writeJobDetailOverviewAsJson(detail, gen, now);
+   for (JobDetails detail 
: result.getFinished()) {
+   
jobDetailsSerializer.serialize(detail, gen, null);
}
gen.writeEndArray();
} else {
--- End diff --

On a second look, I think this response variant is not covered by the 
ported handler.


---


[jira] [Commented] (FLINK-7638) Port CurrentJobsOverviewHandler to new REST endpoint

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174969#comment-16174969
 ] 

ASF GitHub Bot commented on FLINK-7638:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4688#discussion_r140280832
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
 ---
@@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler(
StringWriter writer = new 
StringWriter();
try {
JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+   final 
JobDetails.JobDetailsSerializer jobDetailsSerializer = new 
JobDetails.JobDetailsSerializer();
+
gen.writeStartObject();
 
if (includeRunningJobs && 
includeFinishedJobs) {

gen.writeArrayFieldStart("running");
-   for (JobDetails detail 
: result.getRunningJobs()) {
-   
writeJobDetailOverviewAsJson(detail, gen, now);
+   for (JobDetails detail 
: result.getRunning()) {
+   
jobDetailsSerializer.serialize(detail, gen, null);
}
gen.writeEndArray();
 

gen.writeArrayFieldStart("finished");
-   for (JobDetails detail 
: result.getFinishedJobs()) {
-   
writeJobDetailOverviewAsJson(detail, gen, now);
+   for (JobDetails detail 
: result.getFinished()) {
+   
jobDetailsSerializer.serialize(detail, gen, null);
}
gen.writeEndArray();
} else {
--- End diff --

On a second look, I think this response variant is not covered by the 
ported handler.


> Port CurrentJobsOverviewHandler to new REST endpoint
> 
>
> Key: FLINK-7638
> URL: https://issues.apache.org/jira/browse/FLINK-7638
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Port the existing {{CurrentJobsOverviewHandler}} to the new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4688: [FLINK-7638] [flip6] Port CurrentJobsOverviewHandl...

2017-09-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4688#discussion_r140281028
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
 ---
@@ -92,24 +101,26 @@ public CurrentJobsOverviewHandler(
StringWriter writer = new 
StringWriter();
try {
JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+   final 
JobDetails.JobDetailsSerializer jobDetailsSerializer = new 
JobDetails.JobDetailsSerializer();
+
gen.writeStartObject();
 
if (includeRunningJobs && 
includeFinishedJobs) {

gen.writeArrayFieldStart("running");
--- End diff --

"running" --> can use `MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING`


---


[jira] [Commented] (FLINK-7439) Support variable arguments for UDTF in SQL

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174953#comment-16174953
 ] 

ASF GitHub Bot commented on FLINK-7439:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4536#discussion_r140276174
  
--- Diff: docs/dev/table/udfs.md ---
@@ -297,7 +297,7 @@ optionally implemented. While some of these methods 
allow the system more effici
 - `merge()` is required for many batch aggreagtions and session window 
aggregations.
 - `resetAccumulator()` is required for many batch aggregations.
 
-All methods of `AggregateFunction` must be declared as `public`, not 
`static` and named exactly as the names mentioned above. The methods 
`createAccumulator`, `getValue`, `getResultType`, and `getAccumulatorType` are 
defined in the `AggregateFunction` abstract class, while others are contracted 
methods. In order to define a table function, one has to extend the base class 
`org.apache.flink.table.functions.AggregateFunction` and implement one (or 
more) `accumulate` methods. 
--- End diff --

"The method `accumulate` can be overloaded with different custom types and 
arguments and also support variable arguments."

->

"The method `accumulate` can be overloaded with different parameter types 
and supports variable arguments."


> Support variable arguments for UDTF in SQL
> --
>
> Key: FLINK-7439
> URL: https://issues.apache.org/jira/browse/FLINK-7439
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently, both UDF and UDAF support variable parameters, but UDTF not. 
> FLINK-5882 supports variable UDTF for Table API only, but missed SQL.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...

2017-09-21 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4536#discussion_r140276174
  
--- Diff: docs/dev/table/udfs.md ---
@@ -297,7 +297,7 @@ optionally implemented. While some of these methods 
allow the system more effici
 - `merge()` is required for many batch aggreagtions and session window 
aggregations.
 - `resetAccumulator()` is required for many batch aggregations.
 
-All methods of `AggregateFunction` must be declared as `public`, not 
`static` and named exactly as the names mentioned above. The methods 
`createAccumulator`, `getValue`, `getResultType`, and `getAccumulatorType` are 
defined in the `AggregateFunction` abstract class, while others are contracted 
methods. In order to define a table function, one has to extend the base class 
`org.apache.flink.table.functions.AggregateFunction` and implement one (or 
more) `accumulate` methods. 
--- End diff --

"The method `accumulate` can be overloaded with different custom types and 
arguments and also support variable arguments."

->

"The method `accumulate` can be overloaded with different parameter types 
and supports variable arguments."


---


[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174926#comment-16174926
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r140271306
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -186,7 +186,7 @@ public void testUniformDistributionBounded4() throws 
IOException {
assertEquals(2, first.getNumBuffers());
assertEquals(2, second.getNumBuffers());
 
-   String msg = "Wrong number of available segments after create 
buffer pool and request segments.";
+   String msg = "Wrong number of available segments after creating 
buffer pool and requesting segments.";
--- End diff --

still "buffer pool**s**"


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}} directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4485: [FLINK-7378][core]Create a fix size (non rebalanci...

2017-09-21 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4485#discussion_r140271306
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 ---
@@ -186,7 +186,7 @@ public void testUniformDistributionBounded4() throws 
IOException {
assertEquals(2, first.getNumBuffers());
assertEquals(2, second.getNumBuffers());
 
-   String msg = "Wrong number of available segments after create 
buffer pool and request segments.";
+   String msg = "Wrong number of available segments after creating 
buffer pool and requesting segments.";
--- End diff --

still "buffer pool**s**"


---


[jira] [Created] (FLINK-7663) Allow AbstractRestHandler to handle bad requests

2017-09-21 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7663:


 Summary: Allow AbstractRestHandler to handle bad requests
 Key: FLINK-7663
 URL: https://issues.apache.org/jira/browse/FLINK-7663
 Project: Flink
  Issue Type: Bug
  Components: REST
Affects Versions: 1.4.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann


The {{AbstractRestHandler}} parses the request and tries to generate a 
{{HandlerRequest}}. If this fails, then the server answers with an internal 
server error. Instead we should allow the {{AbstractRestHandler}} to be able to 
return a BAD_REQUEST status code. In order to do that, I would like to 
introduce a {{HandlerRequestException}} which can be thrown while creating the 
{{HandlerRequest}}. If this exception is thrown, then we return an error 
message with {{BAD_REQUEST}} {{HttpResponseStatus}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174882#comment-16174882
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r140266297
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeS

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-21 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r140266297
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   rightOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   leftOperatorTime,
  

[jira] [Commented] (FLINK-7647) Port JobManagerConfigHandler to new REST endpoint

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174880#comment-16174880
 ] 

ASF GitHub Bot commented on FLINK-7647:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4691
  
Sorry about that, local Travis tests pass, should be fine now.


> Port JobManagerConfigHandler to new REST endpoint
> -
>
> Key: FLINK-7647
> URL: https://issues.apache.org/jira/browse/FLINK-7647
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobManagerConfigHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4691: [FLINK-7647] [flip6] Port JobManagerConfigHandler to new ...

2017-09-21 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4691
  
Sorry about that, local Travis tests pass, should be fine now.


---


[jira] [Assigned] (FLINK-7652) Port CurrentJobIdsHandler to new REST endpoint

2017-09-21 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-7652:
--

Assignee: Tzu-Li (Gordon) Tai

> Port CurrentJobIdsHandler to new REST endpoint
> --
>
> Key: FLINK-7652
> URL: https://issues.apache.org/jira/browse/FLINK-7652
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{CurrentJobIdsHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7648) Port TaskManagersHandler to new REST endpoint

2017-09-21 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-7648:
--

Assignee: (was: Tzu-Li (Gordon) Tai)

> Port TaskManagersHandler to new REST endpoint
> -
>
> Key: FLINK-7648
> URL: https://issues.apache.org/jira/browse/FLINK-7648
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{TaskManagersHandler}} to the new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174875#comment-16174875
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r140263201
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util.{ArrayList, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import 
org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  * Two kinds of time criteria:
+  * "L.time between R.time + X and R.time + Y" or "R.time between L.time - 
Y and L.time - X".
+  *
+  * @param leftLowerBound  the lower bound for the left stream (X in the 
criteria)
+  * @param leftUpperBound  the upper bound for the left stream (Y in the 
criteria)
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  * @param timeIndicator   indicate whether joining on proctime or rowtime
+  *
+  */
+abstract class TimeBoundedStreamInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val allowedLateness: Long,
+private val leftType: TypeInformation[Row],
+private val rightType: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String,
+private val leftTimeIdx: Int,
+private val rightTimeIdx: Int,
+private val timeIndicator: JoinTimeIndicator)
+extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store rows from the left stream
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store rows from the right stream
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  protected var leftOperatorTime: Long = 0L
+  protected var rightOperatorTime: Long = 0L
+
+  //For delayed cleanup
+  private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2
+
+  if (allowedLateness < 0) {
+throw new IllegalArgumentException("The allowed lateness must be 
non-negati

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-21 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r140263201
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util.{ArrayList, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import 
org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  * Two kinds of time criteria:
+  * "L.time between R.time + X and R.time + Y" or "R.time between L.time - 
Y and L.time - X".
+  *
+  * @param leftLowerBound  the lower bound for the left stream (X in the 
criteria)
+  * @param leftUpperBound  the upper bound for the left stream (Y in the 
criteria)
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  * @param timeIndicator   indicate whether joining on proctime or rowtime
+  *
+  */
+abstract class TimeBoundedStreamInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val allowedLateness: Long,
+private val leftType: TypeInformation[Row],
+private val rightType: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String,
+private val leftTimeIdx: Int,
+private val rightTimeIdx: Int,
+private val timeIndicator: JoinTimeIndicator)
+extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store rows from the left stream
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store rows from the right stream
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  protected var leftOperatorTime: Long = 0L
+  protected var rightOperatorTime: Long = 0L
+
+  //For delayed cleanup
+  private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2
+
+  if (allowedLateness < 0) {
+throw new IllegalArgumentException("The allowed lateness must be 
non-negative.")
+  }
+
+  /**
+* Get the maximum interval between receiving a row and emitting it (as 
part of a joined result).
+* Only reasonable for row time join.
+*
+* @return the maximum delay for the outputs
+  

[jira] [Created] (FLINK-7662) Remove unnecessary packaged licenses

2017-09-21 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-7662:
---

 Summary: Remove unnecessary packaged licenses
 Key: FLINK-7662
 URL: https://issues.apache.org/jira/browse/FLINK-7662
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.4.0


With the new shading approach, we no longer shade ASM into Flink artifacts, so 
we do not need to package the ASM license into those artifacts any more.

Instead, a shaded ASM artifact already containing a packaged license is used in 
the distribution build.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-21 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r140262200
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   rightOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   leftOperatorTime,

[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174868#comment-16174868
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r140262200
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativ

[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-21 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140258895
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
 ---
@@ -309,7 +309,7 @@ public boolean write(T record) throws IOException {
//   Access Utilities
// 


-   private long readPointer(int logicalPosition) {
+   protected long readPointer(int logicalPosition) {
--- End diff --

`protected final` with a comment why is it so should be ok.


---


[GitHub] flink pull request #3511: [Flink-5734] code generation for normalizedkey sor...

2017-09-21 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/3511#discussion_r140258675
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
 ---
@@ -47,61 +47,61 @@

public static final int MAX_NORMALIZED_KEY_LEN_PER_ELEMENT = 8;

-   private static final int MIN_REQUIRED_BUFFERS = 3;
+   public static final int MIN_REQUIRED_BUFFERS = 3;

-   private static final int LARGE_RECORD_THRESHOLD = 10 * 1024 * 1024;
+   public static final int LARGE_RECORD_THRESHOLD = 10 * 1024 * 1024;

-   private static final long LARGE_RECORD_TAG = 1L << 63;
+   public static final long LARGE_RECORD_TAG = 1L << 63;

-   private static final long POINTER_MASK = LARGE_RECORD_TAG - 1;
+   public static final long POINTER_MASK = LARGE_RECORD_TAG - 1;
 
--- End diff --

Maybe put that into the comment inside the code?


---


[jira] [Commented] (FLINK-7661) Add credit field in PartitionRequest message

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174841#comment-16174841
 ] 

ASF GitHub Bot commented on FLINK-7661:
---

GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/4698

[FLINK-7661][network] Add credit field in PartitionRequest message

## What is the purpose of the change

*`PartitionRequest` message adds the credit field which corresponds to the 
number of exclusive segments in `InputChannel`*.

*This pull request is based on 
[4499](https://github.com/apache/flink/pull/4499) whose commit is also included 
for passing travis. Review the third commit for this PR change*.

## Brief change log

  - *Add credit field in `PartitionRequest` message*
  - *Add `getInitialCredit()` method in `RemoteInputChannel`*

## Verifying this change

This change is already covered by existing tests, such as 
*NettyMessageSerializationTest*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-7661

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4698.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4698


commit 693303c5d720fd720865f60aaf9a59f4bf547396
Author: Zhijiang 
Date:   2017-08-07T09:31:17Z

[FLINK-7378][core]Create a fix size (non rebalancing) buffer pool type for 
the floating buffers

commit 4c582cba58dbdfb0485b3d23375a62428a160b2b
Author: Zhijiang 
Date:   2017-08-14T06:30:47Z

[FLINK-7394][core] Manage exclusive buffers in RemoteInputChannel

commit b9e0447202a7621adf8bd8646e7d2a54dca00f2d
Author: Zhijiang 
Date:   2017-09-21T08:28:16Z

[FLINK-7661][network] Add credit field in PartitionRequest message




> Add credit field in PartitionRequest message
> 
>
> Key: FLINK-7661
> URL: https://issues.apache.org/jira/browse/FLINK-7661
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: zhijiang
>Assignee: zhijiang
>
> Currently the {{PartitionRequest}} message contains {{ResultPartitionID}} | 
> {{queueIndex}} | {{InputChannelID}} fields.
> We will add a new {{credit}} field indicating the initial credit of 
> {{InputChannel}}, and this info can be got from {{InputChannel}} directly 
> after assigning exclusive buffers to it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4698: [FLINK-7661][network] Add credit field in Partitio...

2017-09-21 Thread zhijiangW
GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/4698

[FLINK-7661][network] Add credit field in PartitionRequest message

## What is the purpose of the change

*`PartitionRequest` message adds the credit field which corresponds to the 
number of exclusive segments in `InputChannel`*.

*This pull request is based on 
[4499](https://github.com/apache/flink/pull/4499) whose commit is also included 
for passing travis. Review the third commit for this PR change*.

## Brief change log

  - *Add credit field in `PartitionRequest` message*
  - *Add `getInitialCredit()` method in `RemoteInputChannel`*

## Verifying this change

This change is already covered by existing tests, such as 
*NettyMessageSerializationTest*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-7661

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4698.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4698


commit 693303c5d720fd720865f60aaf9a59f4bf547396
Author: Zhijiang 
Date:   2017-08-07T09:31:17Z

[FLINK-7378][core]Create a fix size (non rebalancing) buffer pool type for 
the floating buffers

commit 4c582cba58dbdfb0485b3d23375a62428a160b2b
Author: Zhijiang 
Date:   2017-08-14T06:30:47Z

[FLINK-7394][core] Manage exclusive buffers in RemoteInputChannel

commit b9e0447202a7621adf8bd8646e7d2a54dca00f2d
Author: Zhijiang 
Date:   2017-09-21T08:28:16Z

[FLINK-7661][network] Add credit field in PartitionRequest message




---


[GitHub] flink pull request #4695: [FLINK-7661][network] Add credit field in Partitio...

2017-09-21 Thread zhijiangW
Github user zhijiangW closed the pull request at:

https://github.com/apache/flink/pull/4695


---


[jira] [Commented] (FLINK-7661) Add credit field in PartitionRequest message

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174839#comment-16174839
 ] 

ASF GitHub Bot commented on FLINK-7661:
---

Github user zhijiangW closed the pull request at:

https://github.com/apache/flink/pull/4695


> Add credit field in PartitionRequest message
> 
>
> Key: FLINK-7661
> URL: https://issues.apache.org/jira/browse/FLINK-7661
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: zhijiang
>Assignee: zhijiang
>
> Currently the {{PartitionRequest}} message contains {{ResultPartitionID}} | 
> {{queueIndex}} | {{InputChannelID}} fields.
> We will add a new {{credit}} field indicating the initial credit of 
> {{InputChannel}}, and this info can be got from {{InputChannel}} directly 
> after assigning exclusive buffers to it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-21 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r140255052
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util.{ArrayList, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import 
org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  * Two kinds of time criteria:
+  * "L.time between R.time + X and R.time + Y" or "R.time between L.time - 
Y and L.time - X".
+  *
+  * @param leftLowerBound  the lower bound for the left stream (X in the 
criteria)
+  * @param leftUpperBound  the upper bound for the left stream (Y in the 
criteria)
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  * @param timeIndicator   indicate whether joining on proctime or rowtime
+  *
+  */
+abstract class TimeBoundedStreamInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val allowedLateness: Long,
+private val leftType: TypeInformation[Row],
+private val rightType: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String,
+private val leftTimeIdx: Int,
+private val rightTimeIdx: Int,
+private val timeIndicator: JoinTimeIndicator)
+extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store rows from the left stream
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store rows from the right stream
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  protected var leftOperatorTime: Long = 0L
+  protected var rightOperatorTime: Long = 0L
+
+  //For delayed cleanup
+  private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2
+
+  if (allowedLateness < 0) {
+throw new IllegalArgumentException("The allowed lateness must be 
non-negative.")
+  }
+
+  /**
+* Get the maximum interval between receiving a row and emitting it (as 
part of a joined result).
+* Only reasonable for row time join.
+*
+* @return the maximum delay for the outputs
+

[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174834#comment-16174834
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r140255052
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util.{ArrayList, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import 
org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  * Two kinds of time criteria:
+  * "L.time between R.time + X and R.time + Y" or "R.time between L.time - 
Y and L.time - X".
+  *
+  * @param leftLowerBound  the lower bound for the left stream (X in the 
criteria)
+  * @param leftUpperBound  the upper bound for the left stream (Y in the 
criteria)
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  * @param timeIndicator   indicate whether joining on proctime or rowtime
+  *
+  */
+abstract class TimeBoundedStreamInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val allowedLateness: Long,
+private val leftType: TypeInformation[Row],
+private val rightType: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String,
+private val leftTimeIdx: Int,
+private val rightTimeIdx: Int,
+private val timeIndicator: JoinTimeIndicator)
+extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store rows from the left stream
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store rows from the right stream
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  protected var leftOperatorTime: Long = 0L
+  protected var rightOperatorTime: Long = 0L
+
+  //For delayed cleanup
+  private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2
+
+  if (allowedLateness < 0) {
+throw new IllegalArgumentException("The allowed lateness must be 
non-negative

[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174815#comment-16174815
 ] 

ASF GitHub Bot commented on FLINK-6233:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r140251765
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeS

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-21 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r140251765
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   rightOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   leftOperatorTime,
  

[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174757#comment-16174757
 ] 

ASF GitHub Bot commented on FLINK-7378:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , sorry for the typos. I have submitted the updates.


> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for 
> {{SingleInputGate}} is limited by {{a *  + b}}, where a 
> is the number of exclusive buffers for each channel and b is the number of 
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix 
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}. 
> And the exclusive buffers are assigned to {{InputChannel}} directly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...

2017-09-21 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4485
  
@NicoK , sorry for the typos. I have submitted the updates.


---


[jira] [Commented] (FLINK-7638) Port CurrentJobsOverviewHandler to new REST endpoint

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174746#comment-16174746
 ] 

ASF GitHub Bot commented on FLINK-7638:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4688
  
@tillrohrmann Yes that makes sense, lets do that separately in the future 
then.

+1.


> Port CurrentJobsOverviewHandler to new REST endpoint
> 
>
> Key: FLINK-7638
> URL: https://issues.apache.org/jira/browse/FLINK-7638
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> Port the existing {{CurrentJobsOverviewHandler}} to the new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4688: [FLINK-7638] [flip6] Port CurrentJobsOverviewHandler to n...

2017-09-21 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4688
  
@tillrohrmann Yes that makes sense, lets do that separately in the future 
then.

+1.


---


[jira] [Commented] (FLINK-7502) PrometheusReporter improvements

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174734#comment-16174734
 ] 

ASF GitHub Bot commented on FLINK-7502:
---

Github user mbode commented on the issue:

https://github.com/apache/flink/pull/4586
  
@zentol It would be great if you could have another look on occasion, I 
added better handling for metrics that are registered e.g. by different 
subtasks.

[green travis](https://travis-ci.org/mbode/flink/builds/274685138)


> PrometheusReporter improvements
> ---
>
> Key: FLINK-7502
> URL: https://issues.apache.org/jira/browse/FLINK-7502
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Maximilian Bode
>Assignee: Maximilian Bode
>Priority: Minor
>
> * do not throw exceptions on metrics being registered for second time
> * allow port ranges for setups where multiple reporters are on same host 
> (e.g. one TaskManager and one JobManager)
> * do not use nanohttpd anymore, there is now a minimal http server included 
> in [Prometheus JVM client|https://github.com/prometheus/client_java]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4586: [FLINK-7502] [metrics] Improve PrometheusReporter

2017-09-21 Thread mbode
Github user mbode commented on the issue:

https://github.com/apache/flink/pull/4586
  
@zentol It would be great if you could have another look on occasion, I 
added better handling for metrics that are registered e.g. by different 
subtasks.

[green travis](https://travis-ci.org/mbode/flink/builds/274685138)


---


[jira] [Assigned] (FLINK-7649) Port JobStoppingHandler to new REST endpoint

2017-09-21 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-7649:


Assignee: Till Rohrmann

> Port JobStoppingHandler to new REST endpoint
> 
>
> Key: FLINK-7649
> URL: https://issues.apache.org/jira/browse/FLINK-7649
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing `JobStoppingHandler` to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7649) Port JobStoppingHandler to new REST endpoint

2017-09-21 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-7649:
-
Description: Port existing {{JobStoppingHandler}} to new REST endpoint  
(was: Port existing `JobStoppingHandler` to new REST endpoint)

> Port JobStoppingHandler to new REST endpoint
> 
>
> Key: FLINK-7649
> URL: https://issues.apache.org/jira/browse/FLINK-7649
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobStoppingHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7647) Port JobManagerConfigHandler to new REST endpoint

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16174731#comment-16174731
 ] 

ASF GitHub Bot commented on FLINK-7647:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4691
  
Still failing with `ClusterConfigHandlerTest.testGetPaths:32 NullPointer`


> Port JobManagerConfigHandler to new REST endpoint
> -
>
> Key: FLINK-7647
> URL: https://issues.apache.org/jira/browse/FLINK-7647
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobManagerConfigHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >