[GitHub] flink issue #4315: [FLINK-5541] Missing null check for localJar in FlinkSubm...

2017-07-13 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4315
  
@tedyu What do you think of this change ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5541) Missing null check for localJar in FlinkSubmitter#submitTopology()

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085286#comment-16085286
 ] 

ASF GitHub Bot commented on FLINK-5541:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4315
  
@tedyu What do you think of this change ?


> Missing null check for localJar in FlinkSubmitter#submitTopology()
> --
>
> Key: FLINK-5541
> URL: https://issues.apache.org/jira/browse/FLINK-5541
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Minor
>
> {code}
>   if (localJar == null) {
> try {
>   for (final URL url : ((ContextEnvironment) 
> ExecutionEnvironment.getExecutionEnvironment())
>   .getJars()) {
> // TODO verify that there is only one jar
> localJar = new File(url.toURI()).getAbsolutePath();
>   }
> } catch (final URISyntaxException e) {
>   // ignore
> } catch (final ClassCastException e) {
>   // ignore
> }
>   }
>   logger.info("Submitting topology " + name + " in distributed mode with 
> conf " + serConf);
>   client.submitTopologyWithOpts(name, localJar, topology);
> {code}
> Since the try block may encounter URISyntaxException / ClassCastException, we 
> should check that localJar is not null before calling 
> submitTopologyWithOpts().



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector

2017-07-13 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-6951:

Flags: Important
Fix Version/s: 1.3.2

> Incompatible versions of httpcomponents jars for Flink kinesis connector
> 
>
> Key: FLINK-6951
> URL: https://issues.apache.org/jira/browse/FLINK-6951
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Ted Yu
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>
> In the following thread, Bowen reported incompatible versions of 
> httpcomponents jars for Flink kinesis connector :
> http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector
> We should find a solution such that users don't have to change dependency 
> version(s) themselves when building Flink kinesis connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector

2017-07-13 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-6951:

Fix Version/s: (was: 1.3.2)
   1.4.0

> Incompatible versions of httpcomponents jars for Flink kinesis connector
> 
>
> Key: FLINK-6951
> URL: https://issues.apache.org/jira/browse/FLINK-6951
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Ted Yu
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>
> In the following thread, Bowen reported incompatible versions of 
> httpcomponents jars for Flink kinesis connector :
> http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector
> We should find a solution such that users don't have to change dependency 
> version(s) themselves when building Flink kinesis connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector

2017-07-13 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-6951:

Fix Version/s: 1.3.2

> Incompatible versions of httpcomponents jars for Flink kinesis connector
> 
>
> Key: FLINK-6951
> URL: https://issues.apache.org/jira/browse/FLINK-6951
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Ted Yu
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.3.2
>
>
> In the following thread, Bowen reported incompatible versions of 
> httpcomponents jars for Flink kinesis connector :
> http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector
> We should find a solution such that users don't have to change dependency 
> version(s) themselves when building Flink kinesis connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-13 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-7169:
---
Issue Type: Sub-task  (was: Task)
Parent: FLINK-6935

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-13 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085290#comment-16085290
 ] 

Jark Wu commented on FLINK-7169:


Thanks for your contribution. I moved this issue under FLINK-6935.

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4316: [FLINK-6105] Use InterruptedIOException instead of...

2017-07-13 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/4316

[FLINK-6105] Use InterruptedIOException instead of IOException

This is my first commit of this. It might be have other places also do this 
change, but I am not sure now.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-6105

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4316.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4316


commit 8fb6cf5926643e4c7de967eafa0cef358f174431
Author: zhangminglei 
Date:   2017-07-13T07:23:20Z

[FLINK-6105] Use InterruptedIOException instead of IOException




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085305#comment-16085305
 ] 

ASF GitHub Bot commented on FLINK-6105:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/4316

[FLINK-6105] Use InterruptedIOException instead of IOException

This is my first commit of this. It might be have other places also do this 
change, but I am not sure now.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-6105

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4316.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4316


commit 8fb6cf5926643e4c7de967eafa0cef358f174431
Author: zhangminglei 
Date:   2017-07-13T07:23:20Z

[FLINK-6105] Use InterruptedIOException instead of IOException




> Properly handle InterruptedException in HadoopInputFormatBase
> -
>
> Key: FLINK-6105
> URL: https://issues.apache.org/jira/browse/FLINK-6105
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Ted Yu
>Assignee: mingleizhang
>
> When catching InterruptedException, we should throw InterruptedIOException 
> instead of IOException.
> The following example is from HadoopInputFormatBase :
> {code}
> try {
>   splits = this.mapreduceInputFormat.getSplits(jobContext);
> } catch (InterruptedException e) {
>   throw new IOException("Could not get Splits.", e);
> }
> {code}
> There may be other places where IOE is thrown.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector

2017-07-13 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-6951:
---
Priority: Blocker  (was: Major)

> Incompatible versions of httpcomponents jars for Flink kinesis connector
> 
>
> Key: FLINK-6951
> URL: https://issues.apache.org/jira/browse/FLINK-6951
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Ted Yu
>Assignee: Bowen Li
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2
>
>
> In the following thread, Bowen reported incompatible versions of 
> httpcomponents jars for Flink kinesis connector :
> http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector
> We should find a solution such that users don't have to change dependency 
> version(s) themselves when building Flink kinesis connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Issue Comment Deleted] (FLINK-7165) Flink can't deploy on hadoop 2.6.0-cdh5.5.1 yarn

2017-07-13 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-7165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

刘喆 updated FLINK-7165:
--
Comment: was deleted

(was: When using  Flink 1.2.1   for  hadoop 2.6  , It work fine. But 1.2.1 
lacks some features I need .)

> Flink can't deploy on hadoop 2.6.0-cdh5.5.1 yarn
> 
>
> Key: FLINK-7165
> URL: https://issues.apache.org/jira/browse/FLINK-7165
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.3.1, 1.4.0
> Environment: Flink 1.3.1 and Flink 1.4-SNAPSHOT
> hadoop 2.6.0-cdh5.5.1
>Reporter: 刘喆
>
> When run with : 
> {quote}
>  HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster  -yn 300 
> -yjm 4192 -ytm 1124 -c fTest.StreamingJob -p 300 ./fT.jar
> {quote}
> It failed.
> Logs as below:
> {quote}
> 2017-07-12 17:49:44.774 [main] INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner  - TM:remote keytab 
> principal obtained null
> 2017-07-12 17:49:44.774 [main] INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner  - TM:remote yarn conf path 
> obtained null
> 2017-07-12 17:49:44.774 [main] INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner  - TM:remote krb5 path 
> obtained null
> 2017-07-12 17:49:44.986 [main] ERROR 
> org.apache.flink.yarn.YarnApplicationMasterRunner  - YARN Application Master 
> initialization failed
> java.lang.AbstractMethodError: 
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy()Ljava/lang/Object;
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.(RetryInvocationHandler.java:73)
>  ~[fT.jar:na]
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.(RetryInvocationHandler.java:64)
>  ~[fT.jar:na]
> at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58) 
> ~[fT.jar:na]
> at 
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:183) 
> ~[flink-shaded-hadoop2-uber-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:665) 
> ~[flink-shaded-hadoop2-uber-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:601) 
> ~[flink-shaded-hadoop2-uber-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
>  ~[flink-shaded-hadoop2-uber-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2316) 
> ~[fT.jar:na]
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:90) 
> ~[fT.jar:na]
> at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2350) 
> ~[fT.jar:na]
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2332) 
> ~[fT.jar:na]
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:369) 
> ~[fT.jar:na]
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) ~[fT.jar:na]
> at 
> org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:380) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.flink.yarn.YarnApplicationMasterRunner.runApplicationMaster(YarnApplicationMasterRunner.java:318)
>  [flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.flink.yarn.YarnApplicationMasterRunner$1.call(YarnApplicationMasterRunner.java:191)
>  [flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.flink.yarn.YarnApplicationMasterRunner$1.call(YarnApplicationMasterRunner.java:188)
>  [flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44)
>  [flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at java.security.AccessController.doPrivileged(Native Method) 
> [na:1.8.0_112]
> at javax.security.auth.Subject.doAs(Subject.java:422) [na:1.8.0_112]
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>  [fT.jar:na]
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  [flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.flink.yarn.YarnApplicationMasterRunner.run(YarnApplicationMasterRunner.java:188)
>  [flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.flink.yarn.YarnApplicationMasterRunner.main(YarnApplicationMasterRunner.java:112)
>  [flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7165) Flink can't deploy on hadoop 2.6.0-cdh5.5.1 yarn

2017-07-13 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-7165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16084246#comment-16084246
 ] 

刘喆 edited comment on FLINK-7165 at 7/13/17 7:41 AM:


I have tried the following approaches:
#  Try the Flink for Hadoop 2.6.x,  I got the same error
#  Build from source with hadoop 2.6.0-cdh5.5.1, I got the same error
#  Try to using the hadoop jars within  CLASSPATH, it works for 1.3.1, but not 
for 1.4-SNAPSHOT. Then I found that 1.4-SNAPSHOT using scala-2.11 by default, 
but some of my jars need scala-2.10.

Any way:
# It works.
# can we make it work with no trick?



was (Author: liuzhe):
I have tried the following approaches:
#  Try the Flink for Hadoop 2.6.x,  I got the same error
#  Build from source with hadoop 2.6.0-cdh5.5.1, I got the same error
#  try to using the hadoop jars,  but other classes error


> Flink can't deploy on hadoop 2.6.0-cdh5.5.1 yarn
> 
>
> Key: FLINK-7165
> URL: https://issues.apache.org/jira/browse/FLINK-7165
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.3.1, 1.4.0
> Environment: Flink 1.3.1 and Flink 1.4-SNAPSHOT
> hadoop 2.6.0-cdh5.5.1
>Reporter: 刘喆
>
> When run with : 
> {quote}
>  HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster  -yn 300 
> -yjm 4192 -ytm 1124 -c fTest.StreamingJob -p 300 ./fT.jar
> {quote}
> It failed.
> Logs as below:
> {quote}
> 2017-07-12 17:49:44.774 [main] INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner  - TM:remote keytab 
> principal obtained null
> 2017-07-12 17:49:44.774 [main] INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner  - TM:remote yarn conf path 
> obtained null
> 2017-07-12 17:49:44.774 [main] INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner  - TM:remote krb5 path 
> obtained null
> 2017-07-12 17:49:44.986 [main] ERROR 
> org.apache.flink.yarn.YarnApplicationMasterRunner  - YARN Application Master 
> initialization failed
> java.lang.AbstractMethodError: 
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy()Ljava/lang/Object;
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.(RetryInvocationHandler.java:73)
>  ~[fT.jar:na]
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.(RetryInvocationHandler.java:64)
>  ~[fT.jar:na]
> at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58) 
> ~[fT.jar:na]
> at 
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:183) 
> ~[flink-shaded-hadoop2-uber-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:665) 
> ~[flink-shaded-hadoop2-uber-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:601) 
> ~[flink-shaded-hadoop2-uber-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
>  ~[flink-shaded-hadoop2-uber-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2316) 
> ~[fT.jar:na]
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:90) 
> ~[fT.jar:na]
> at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2350) 
> ~[fT.jar:na]
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2332) 
> ~[fT.jar:na]
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:369) 
> ~[fT.jar:na]
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) ~[fT.jar:na]
> at 
> org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:380) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.flink.yarn.YarnApplicationMasterRunner.runApplicationMaster(YarnApplicationMasterRunner.java:318)
>  [flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.flink.yarn.YarnApplicationMasterRunner$1.call(YarnApplicationMasterRunner.java:191)
>  [flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.flink.yarn.YarnApplicationMasterRunner$1.call(YarnApplicationMasterRunner.java:188)
>  [flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44)
>  [flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at java.security.AccessController.doPrivileged(Native Method) 
> [na:1.8.0_112]
> at javax.security.auth.Subject.doAs(Subject.java:422) [na:1.8.0_112]
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>  [fT.jar:na]
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  [flink-dist_2.11-1.4-SNAPSHOT.jar:1.

[jira] [Comment Edited] (FLINK-7165) Flink can't deploy on hadoop 2.6.0-cdh5.5.1 yarn

2017-07-13 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-7165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16084246#comment-16084246
 ] 

刘喆 edited comment on FLINK-7165 at 7/13/17 7:41 AM:


I have tried the following approaches:
#  Try the Flink for Hadoop 2.6.x,  I got the same error
#  Build from source with hadoop 2.6.0-cdh5.5.1, I got the same error
#  Try to using the hadoop jars within  CLASSPATH, it works for 1.3.1, but not 
for 1.4-SNAPSHOT. Then I found that 1.4-SNAPSHOT using scala-2.11 by default, 
but some of my jars need scala-2.10.

Any way:
# It works.
# can we make it work without trick?



was (Author: liuzhe):
I have tried the following approaches:
#  Try the Flink for Hadoop 2.6.x,  I got the same error
#  Build from source with hadoop 2.6.0-cdh5.5.1, I got the same error
#  Try to using the hadoop jars within  CLASSPATH, it works for 1.3.1, but not 
for 1.4-SNAPSHOT. Then I found that 1.4-SNAPSHOT using scala-2.11 by default, 
but some of my jars need scala-2.10.

Any way:
# It works.
# can we make it work with no trick?


> Flink can't deploy on hadoop 2.6.0-cdh5.5.1 yarn
> 
>
> Key: FLINK-7165
> URL: https://issues.apache.org/jira/browse/FLINK-7165
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.3.1, 1.4.0
> Environment: Flink 1.3.1 and Flink 1.4-SNAPSHOT
> hadoop 2.6.0-cdh5.5.1
>Reporter: 刘喆
>
> When run with : 
> {quote}
>  HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster  -yn 300 
> -yjm 4192 -ytm 1124 -c fTest.StreamingJob -p 300 ./fT.jar
> {quote}
> It failed.
> Logs as below:
> {quote}
> 2017-07-12 17:49:44.774 [main] INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner  - TM:remote keytab 
> principal obtained null
> 2017-07-12 17:49:44.774 [main] INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner  - TM:remote yarn conf path 
> obtained null
> 2017-07-12 17:49:44.774 [main] INFO  
> org.apache.flink.yarn.YarnApplicationMasterRunner  - TM:remote krb5 path 
> obtained null
> 2017-07-12 17:49:44.986 [main] ERROR 
> org.apache.flink.yarn.YarnApplicationMasterRunner  - YARN Application Master 
> initialization failed
> java.lang.AbstractMethodError: 
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy()Ljava/lang/Object;
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.(RetryInvocationHandler.java:73)
>  ~[fT.jar:na]
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.(RetryInvocationHandler.java:64)
>  ~[fT.jar:na]
> at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58) 
> ~[fT.jar:na]
> at 
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:183) 
> ~[flink-shaded-hadoop2-uber-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:665) 
> ~[flink-shaded-hadoop2-uber-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:601) 
> ~[flink-shaded-hadoop2-uber-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
>  ~[flink-shaded-hadoop2-uber-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2316) 
> ~[fT.jar:na]
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:90) 
> ~[fT.jar:na]
> at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2350) 
> ~[fT.jar:na]
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2332) 
> ~[fT.jar:na]
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:369) 
> ~[fT.jar:na]
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) ~[fT.jar:na]
> at 
> org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:380) 
> ~[flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.flink.yarn.YarnApplicationMasterRunner.runApplicationMaster(YarnApplicationMasterRunner.java:318)
>  [flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.flink.yarn.YarnApplicationMasterRunner$1.call(YarnApplicationMasterRunner.java:191)
>  [flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.flink.yarn.YarnApplicationMasterRunner$1.call(YarnApplicationMasterRunner.java:188)
>  [flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44)
>  [flink-dist_2.11-1.4-SNAPSHOT.jar:1.4-SNAPSHOT]
> at java.security.AccessController.doPrivileged(Native Method) 
> [na:1.8.0_112]
> at javax.security.auth.Subject.doAs(Subject.java:422) [na:1.8.0_112]
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(Use

[GitHub] flink issue #4150: [FLINK-6951] Incompatible versions of httpcomponents jars...

2017-07-13 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4150
  
Thanks! Just for double check: have you on your side already verified that 
this works when you're using the connector with S3 (which caused the issue for 
you before)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085313#comment-16085313
 ] 

ASF GitHub Bot commented on FLINK-6951:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4150
  
Thanks! Just for double check: have you on your side already verified that 
this works when you're using the connector with S3 (which caused the issue for 
you before)?


> Incompatible versions of httpcomponents jars for Flink kinesis connector
> 
>
> Key: FLINK-6951
> URL: https://issues.apache.org/jira/browse/FLINK-6951
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Ted Yu
>Assignee: Bowen Li
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2
>
>
> In the following thread, Bowen reported incompatible versions of 
> httpcomponents jars for Flink kinesis connector :
> http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector
> We should find a solution such that users don't have to change dependency 
> version(s) themselves when building Flink kinesis connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6617) Improve JAVA and SCALA logical plans consistent test

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085318#comment-16085318
 ] 

ASF GitHub Bot commented on FLINK-6617:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/3943
  
Merging...


> Improve JAVA and SCALA logical plans consistent test
> 
>
> Key: FLINK-6617
> URL: https://issues.apache.org/jira/browse/FLINK-6617
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently,we need some `StringExpression` test,for all JAVA and SCALA API.
> Such as:`GroupAggregations`,`GroupWindowAggregaton`(Session,Tumble),`Calc` 
> etc.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #3943: [FLINK-6617][table] Improve JAVA and SCALA logical plans ...

2017-07-13 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/3943
  
Merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4266: [FLINK-6232][Table&Sql] support proctime inner windowed s...

2017-07-13 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4266
  
Thanks for the update @hongyuhong!
I will take this PR from here. The logic looks very good but I would like 
to refactor some parts (mainly  the `WindowJoinUtil`). 

I will open a new PR with your work and my commit on top, probably later 
today. 
It would be great if you could review and check my PR.

@wuchong your review is of course also highly welcome :-)

Thank you, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085323#comment-16085323
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4266
  
Thanks for the update @hongyuhong!
I will take this PR from here. The logic looks very good but I would like 
to refactor some parts (mainly  the `WindowJoinUtil`). 

I will open a new PR with your work and my commit on top, probably later 
today. 
It would be great if you could review and check my PR.

@wuchong your review is of course also highly welcome :-)

Thank you, Fabian


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6936) Add multiple targets support for custom partitioner

2017-07-13 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085352#comment-16085352
 ] 

Xingcan Cui commented on FLINK-6936:


Hi [~aljoscha], I have started a PR for that. I wonder if you could help review 
it. Thanks.

> Add multiple targets support for custom partitioner
> ---
>
> Key: FLINK-6936
> URL: https://issues.apache.org/jira/browse/FLINK-6936
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
>
> The current user-facing Partitioner only allows returning one target.
> {code:java}
> @Public
> public interface Partitioner extends java.io.Serializable, Function {
>   /**
>* Computes the partition for the given key.
>*
>* @param key The key.
>* @param numPartitions The number of partitions to partition into.
>* @return The partition index.
>*/
>   int partition(K key, int numPartitions);
> }
> {code}
> Actually, this function should return multiple partitions and this may be a 
> historical legacy.
> There could be at least three approaches to solve this.
> # Make the `protected DataStream setConnectionType(StreamPartitioner 
> partitioner)` method in DataStream public and that allows users to directly 
> define StreamPartitioner.
> # Change the `partition` method in the Partitioner interface to return an int 
> array instead of a single int value.
> # Add a new `multicast` method to DataStream and provide a MultiPartitioner 
> interface which returns an int array.
> Considering the consistency of API, the 3rd approach seems to be an 
> acceptable choice. [~aljoscha], what do you think?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

2017-07-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r126671328
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, 
StreamTableEnvironment, TableException}
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, 
WindowJoinUtil}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.updateutils.UpdateCheckUtils
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamWindowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+joinCondition: RexNode,
+joinType: JoinRelType,
+leftSchema: RowSchema,
+rightSchema: RowSchema,
+schema: RowSchema,
+isRowTime: Boolean,
+leftLowerBound: Long,
+leftUpperBound: Long,
+remainCondition: Option[RexNode],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+with CommonJoin
+with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamWindowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinCondition,
+  joinType,
+  leftSchema,
+  rightSchema,
+  schema,
+  isRowTime,
+  leftLowerBound,
+  leftUpperBound,
+  remainCondition,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+joinToString(
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+joinExplainTerms(
+  super.explainTerms(pw),
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
+val isRightAppendOnly = UpdateCheckUtils.isAppendOnly(right)
+if (!isLeftAppendOnly || !isRightAppendOnly) {
+  throw new TableException(
+"Windowed stream join does not support updates.")
+}
+
+val leftDataStream = 
left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
+val rightDataStream = 
right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
+
+// get the equality keys and other condition
+val joinInfo = JoinInfo.of(leftNode, rightNode, joinCondition)
+val leftKeys = joinInfo.leftKeys.toIntArray
+val rightKeys = joinInfo.rightKeys.toIntArray
+
+// generate join function
+val joinFunction =
+WindowJoinUtil.generateJoinFunction(
+  config,
+  joinType,
+  leftSchema.physicalTypeInfo,
+  rightSchema.physicalTypeInfo,
+  schema,
+  remainCondition,
+  ruleDescript

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

2017-07-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127153991
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
 ---
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftLowerBound
+  *the left stream lower bound, and -leftLowerBound is the right 
stream upper bound
+  * @param leftUpperBound
+  *the left stream upper bound, and -leftUpperBound is the right 
stream lower bound
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param genJoinFuncNamethe function code of other non-equi condition
+  * @param genJoinFuncCodethe function name of other non-equi condition
+  *
+  */
+class ProcTimeWindowInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val element1Type: TypeInformation[Row],
+private val element2Type: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String)
+  extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]{
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  /** other condition function **/
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  /** tmp list to store expired records **/
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+  private val leftStreamWinSize: Long = if (leftLowerBound < 0) 
-leftLowerBound else 0
+  private val rightStreamWinSize: Long = if (leftUpperBound > 0) 
leftUpperBound else 0
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override def open(config: Configuration) {
+LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+  s"Code:\n$genJoinFuncCode")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+LOG.debug("Instantiating JoinFunction.")
+joinFunction = clazz.newInstance()
+
+listToRemove = new util.ArrayList[Long]()
+cRowWrapper = new CRowWrappingCollector()
+cRowWrapper.setChange(true)
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](element1Type)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asIns

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

2017-07-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r126680044
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
 ---
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.plan.schema.{RowSchema, 
TimeIndicatorRelDataType}
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+  * An util class to help analyze and build join code .
+  */
+object WindowJoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
--- End diff --

minor typo: condtion -> condition


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

2017-07-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r126683783
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.mutable
+
+class JoinITCase extends StreamingWithStateTestBase {
+
+  val data = List(
+(1L, 1, "Hello"),
+(2L, 2, "Hello"),
+(3L, 3, "Hello"),
+(4L, 4, "Hello"),
+(5L, 5, "Hello"),
+(6L, 6, "Hello"),
+(7L, 7, "Hello World"),
+(8L, 8, "Hello World"),
+(20L, 20, "Hello World"))
+
+  /** test process time inner join **/
+  @Test
+  def testProcessTimeInnerJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+
+val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on 
t1.a = t2.a and " +
+  "t1.proctime between t2.proctime - interval '5' second and 
t2.proctime + interval '5' second"
+
+val data1 = new mutable.MutableList[(Int, Long, String)]
+data1.+=((1, 1L, "Hi1"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 5L, "Hi3"))
+data1.+=((2, 7L, "Hi5"))
+data1.+=((1, 9L, "Hi6"))
+data1.+=((1, 8L, "Hi8"))
+
+val data2 = new mutable.MutableList[(Int, Long, String)]
+data2.+=((1, 1L, "HiHi"))
+data2.+=((2, 2L, "HeHe"))
+
+val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 
'proctime.proctime)
+val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 
'proctime.proctime)
+
+tEnv.registerTable("T1", t1)
+tEnv.registerTable("T2", t2)
+
+val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+  }
+
+  /** test process time inner join with other condition **/
+  @Test
+  def testProcessTimeInnerJoinWithOtherCondition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.testResults = mutable.MutableList()
--- End diff --

You can simply do `StreamITCase.clear` instead of this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

2017-07-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r126684002
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.mutable
+
+class JoinITCase extends StreamingWithStateTestBase {
+
+  val data = List(
--- End diff --

Looks like the `data` is never used, can we remove it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

2017-07-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r126671128
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
 ---
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
--- End diff --

remove unused import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

2017-07-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r126680468
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
 ---
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.plan.schema.{RowSchema, 
TimeIndicatorRelDataType}
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+  * An util class to help analyze and build join code .
+  */
+object WindowJoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return remain condition.
+*
+* @param  condition   join condition
+* @param  leftLogicalFieldCnt left stream logical field num
+* @param  inputSchema join rowtype schema
+* @param  rexBuilder  util to build rexNode
+* @param  config  table environment config
+* @return isRowTime, left lower boundary, right lower boundary, remain 
condition
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  inputSchema: RowSchema,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (Boolean, Long, Long, Option[RexNode]) = {
+
+// Converts the condition to conjunctive normal form (CNF)
+val cnfCondition = RexUtil.toCnf(rexBuilder, condition)
+
+// split the condition into time indicator condition and other 
condition
+val (timeTerms, remainTerms) = cnfCondition match {
+  case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
+c.getOperands.asScala
+  .map(analyzeCondtionTermType(_, leftLogicalFieldCnt, 
inputSchema.logicalType))
+  .reduceLeft((l, r) => {
+(l._1 ++ r._1, l._2 ++ r._2)
+  })
+  case _ =>
+throw new TableException("A time-based stream join requires 
exactly " +
+  "two join predicates that bound the time in both directions.")
+}
+
+if (timeTerms.size != 2) {
+  throw new TableException("A time-based stream join requires exactly 
" +
+"two join predicates that bound the time in both directions.")
+}
+
+// extract time offset from the time indicator conditon
--- End diff --

minor typo: conditon -> condition


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

2017-07-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127129047
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
 ---
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.plan.schema.{RowSchema, 
TimeIndicatorRelDataType}
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+  * An util class to help analyze and build join code .
+  */
+object WindowJoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return remain condition.
+*
+* @param  condition   join condition
+* @param  leftLogicalFieldCnt left stream logical field num
+* @param  inputSchema join rowtype schema
+* @param  rexBuilder  util to build rexNode
+* @param  config  table environment config
+* @return isRowTime, left lower boundary, right lower boundary, remain 
condition
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  inputSchema: RowSchema,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (Boolean, Long, Long, Option[RexNode]) = {
+
+// Converts the condition to conjunctive normal form (CNF)
+val cnfCondition = RexUtil.toCnf(rexBuilder, condition)
+
+// split the condition into time indicator condition and other 
condition
+val (timeTerms, remainTerms) = cnfCondition match {
+  case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
+c.getOperands.asScala
+  .map(analyzeCondtionTermType(_, leftLogicalFieldCnt, 
inputSchema.logicalType))
+  .reduceLeft((l, r) => {
+(l._1 ++ r._1, l._2 ++ r._2)
+  })
+  case _ =>
+throw new TableException("A time-based stream join requires 
exactly " +
+  "two join predicates that bound the time in both directions.")
+}
+
+if (timeTerms.size != 2) {
+  throw new TableException("A time-based stream join requires exactly 
" +
+"two join predicates that bound the time in both directions.")
+}
+
+// extract time offset from the time indicator conditon
+val streamTimeOffsets =
+timeTerms.map(x => extractTimeOffsetFromCondition(x._3, x._2, 
rexBuilder, config))
+
+val (leftLowerBound, leftUpperBound) =
+  streamTimeOffsets match {
+case Seq((x, true), (y, false)) => (x, y)
+case Seq((x, false), (y, true)) => (y, x)
+case _ =>
+  throw new TableException(
+"Time-based join conditions must reference the time attribute 
of both input tables.")
+  }
+
+// compose the remain condition list into one condition
+val remainCondition =
+remainTerms match {
+  case Seq() => None
+  case _ =>
+// Converts logical field references to physical ones.
+Some(remainTerms.map(inputSchema.mapRexNode).reduceLeft((l, r) => {
+  RelOptUtil.andJoinFilters(rexBuilder, l, r)
+}))
+}
+
 

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

2017-07-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127142150
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
 ---
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftLowerBound
+  *the left stream lower bound, and -leftLowerBound is the right 
stream upper bound
+  * @param leftUpperBound
+  *the left stream upper bound, and -leftUpperBound is the right 
stream lower bound
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param genJoinFuncNamethe function code of other non-equi condition
+  * @param genJoinFuncCodethe function name of other non-equi condition
+  *
+  */
+class ProcTimeWindowInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val element1Type: TypeInformation[Row],
+private val element2Type: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String)
+  extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]{
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  /** other condition function **/
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  /** tmp list to store expired records **/
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+  private val leftStreamWinSize: Long = if (leftLowerBound < 0) 
-leftLowerBound else 0
+  private val rightStreamWinSize: Long = if (leftUpperBound > 0) 
leftUpperBound else 0
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override def open(config: Configuration) {
+LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+  s"Code:\n$genJoinFuncCode")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+LOG.debug("Instantiating JoinFunction.")
+joinFunction = clazz.newInstance()
+
+listToRemove = new util.ArrayList[Long]()
+cRowWrapper = new CRowWrappingCollector()
+cRowWrapper.setChange(true)
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](element1Type)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asIns

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

2017-07-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r126683700
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.mutable
+
+class JoinITCase extends StreamingWithStateTestBase {
+
+  val data = List(
+(1L, 1, "Hello"),
+(2L, 2, "Hello"),
+(3L, 3, "Hello"),
+(4L, 4, "Hello"),
+(5L, 5, "Hello"),
+(6L, 6, "Hello"),
+(7L, 7, "Hello World"),
+(8L, 8, "Hello World"),
+(20L, 20, "Hello World"))
+
+  /** test process time inner join **/
+  @Test
+  def testProcessTimeInnerJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.testResults = mutable.MutableList()
--- End diff --

You can simply do `StreamITCase.clear` instead of this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085365#comment-16085365
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r126671128
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
 ---
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
--- End diff --

remove unused import


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085370#comment-16085370
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127153991
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
 ---
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftLowerBound
+  *the left stream lower bound, and -leftLowerBound is the right 
stream upper bound
+  * @param leftUpperBound
+  *the left stream upper bound, and -leftUpperBound is the right 
stream lower bound
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param genJoinFuncNamethe function code of other non-equi condition
+  * @param genJoinFuncCodethe function name of other non-equi condition
+  *
+  */
+class ProcTimeWindowInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val element1Type: TypeInformation[Row],
+private val element2Type: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String)
+  extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]{
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  /** other condition function **/
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  /** tmp list to store expired records **/
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+  private val leftStreamWinSize: Long = if (leftLowerBound < 0) 
-leftLowerBound else 0
+  private val rightStreamWinSize: Long = if (leftUpperBound > 0) 
leftUpperBound else 0
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override def open(config: Configuration) {
+LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+  s"Code:\n$genJoinFuncCode")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+LOG.debug("Instantiating JoinFunction.")
+joinFunction = clazz.newInstance()
+
+listToRemove = new util.ArrayList[Long]()
+cRowWrapper = new CRowWrappingCollector()
+cRowWrapper.setChange(true)
+
+// initialize row state
+val rowListTypeInfo1:

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085367#comment-16085367
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r126683700
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.mutable
+
+class JoinITCase extends StreamingWithStateTestBase {
+
+  val data = List(
+(1L, 1, "Hello"),
+(2L, 2, "Hello"),
+(3L, 3, "Hello"),
+(4L, 4, "Hello"),
+(5L, 5, "Hello"),
+(6L, 6, "Hello"),
+(7L, 7, "Hello World"),
+(8L, 8, "Hello World"),
+(20L, 20, "Hello World"))
+
+  /** test process time inner join **/
+  @Test
+  def testProcessTimeInnerJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.testResults = mutable.MutableList()
--- End diff --

You can simply do `StreamITCase.clear` instead of this.


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085372#comment-16085372
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r126680044
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
 ---
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.plan.schema.{RowSchema, 
TimeIndicatorRelDataType}
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+  * An util class to help analyze and build join code .
+  */
+object WindowJoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
--- End diff --

minor typo: condtion -> condition


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085368#comment-16085368
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127142150
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
 ---
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftLowerBound
+  *the left stream lower bound, and -leftLowerBound is the right 
stream upper bound
+  * @param leftUpperBound
+  *the left stream upper bound, and -leftUpperBound is the right 
stream lower bound
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param genJoinFuncNamethe function code of other non-equi condition
+  * @param genJoinFuncCodethe function name of other non-equi condition
+  *
+  */
+class ProcTimeWindowInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val element1Type: TypeInformation[Row],
+private val element2Type: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String)
+  extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]{
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  /** other condition function **/
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  /** tmp list to store expired records **/
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+  private val leftStreamWinSize: Long = if (leftLowerBound < 0) 
-leftLowerBound else 0
+  private val rightStreamWinSize: Long = if (leftUpperBound > 0) 
leftUpperBound else 0
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override def open(config: Configuration) {
+LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+  s"Code:\n$genJoinFuncCode")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+LOG.debug("Instantiating JoinFunction.")
+joinFunction = clazz.newInstance()
+
+listToRemove = new util.ArrayList[Long]()
+cRowWrapper = new CRowWrappingCollector()
+cRowWrapper.setChange(true)
+
+// initialize row state
+val rowListTypeInfo1:

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085363#comment-16085363
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r126680468
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
 ---
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.plan.schema.{RowSchema, 
TimeIndicatorRelDataType}
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+  * An util class to help analyze and build join code .
+  */
+object WindowJoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return remain condition.
+*
+* @param  condition   join condition
+* @param  leftLogicalFieldCnt left stream logical field num
+* @param  inputSchema join rowtype schema
+* @param  rexBuilder  util to build rexNode
+* @param  config  table environment config
+* @return isRowTime, left lower boundary, right lower boundary, remain 
condition
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  inputSchema: RowSchema,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (Boolean, Long, Long, Option[RexNode]) = {
+
+// Converts the condition to conjunctive normal form (CNF)
+val cnfCondition = RexUtil.toCnf(rexBuilder, condition)
+
+// split the condition into time indicator condition and other 
condition
+val (timeTerms, remainTerms) = cnfCondition match {
+  case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
+c.getOperands.asScala
+  .map(analyzeCondtionTermType(_, leftLogicalFieldCnt, 
inputSchema.logicalType))
+  .reduceLeft((l, r) => {
+(l._1 ++ r._1, l._2 ++ r._2)
+  })
+  case _ =>
+throw new TableException("A time-based stream join requires 
exactly " +
+  "two join predicates that bound the time in both directions.")
+}
+
+if (timeTerms.size != 2) {
+  throw new TableException("A time-based stream join requires exactly 
" +
+"two join predicates that bound the time in both directions.")
+}
+
+// extract time offset from the time indicator conditon
--- End diff --

minor typo: conditon -> condition


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime,

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085369#comment-16085369
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127129047
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
 ---
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.math.{BigDecimal => JBigDecimal}
+import java.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.plan.schema.{RowSchema, 
TimeIndicatorRelDataType}
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+  * An util class to help analyze and build join code .
+  */
+object WindowJoinUtil {
+
+  /**
+* Analyze time-condtion to get time boundary for each stream and get 
the time type
+* and return remain condition.
+*
+* @param  condition   join condition
+* @param  leftLogicalFieldCnt left stream logical field num
+* @param  inputSchema join rowtype schema
+* @param  rexBuilder  util to build rexNode
+* @param  config  table environment config
+* @return isRowTime, left lower boundary, right lower boundary, remain 
condition
+*/
+  private[flink] def analyzeTimeBoundary(
+  condition: RexNode,
+  leftLogicalFieldCnt: Int,
+  inputSchema: RowSchema,
+  rexBuilder: RexBuilder,
+  config: TableConfig): (Boolean, Long, Long, Option[RexNode]) = {
+
+// Converts the condition to conjunctive normal form (CNF)
+val cnfCondition = RexUtil.toCnf(rexBuilder, condition)
+
+// split the condition into time indicator condition and other 
condition
+val (timeTerms, remainTerms) = cnfCondition match {
+  case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
+c.getOperands.asScala
+  .map(analyzeCondtionTermType(_, leftLogicalFieldCnt, 
inputSchema.logicalType))
+  .reduceLeft((l, r) => {
+(l._1 ++ r._1, l._2 ++ r._2)
+  })
+  case _ =>
+throw new TableException("A time-based stream join requires 
exactly " +
+  "two join predicates that bound the time in both directions.")
+}
+
+if (timeTerms.size != 2) {
+  throw new TableException("A time-based stream join requires exactly 
" +
+"two join predicates that bound the time in both directions.")
+}
+
+// extract time offset from the time indicator conditon
+val streamTimeOffsets =
+timeTerms.map(x => extractTimeOffsetFromCondition(x._3, x._2, 
rexBuilder, config))
+
+val (leftLowerBound, leftUpperBound) =
+  streamTimeOffsets match {
+case Seq((x, true), (y, false)) => (x, y)
+case Seq((x, false), (y, true)) => (y, x)
+case _ =>
+  throw new TableException(
+"Time-based join conditions must reference the time attribute 
of both input tables.")
+  }
+
+// compose the remain condition list into one condition
+val remainCondition =
+remainTerms match {
+  case Seq() => None
+   

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085371#comment-16085371
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r126684002
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.mutable
+
+class JoinITCase extends StreamingWithStateTestBase {
+
+  val data = List(
--- End diff --

Looks like the `data` is never used, can we remove it?


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085364#comment-16085364
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r126671328
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, 
StreamTableEnvironment, TableException}
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, 
WindowJoinUtil}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.updateutils.UpdateCheckUtils
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamWindowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+joinCondition: RexNode,
+joinType: JoinRelType,
+leftSchema: RowSchema,
+rightSchema: RowSchema,
+schema: RowSchema,
+isRowTime: Boolean,
+leftLowerBound: Long,
+leftUpperBound: Long,
+remainCondition: Option[RexNode],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+with CommonJoin
+with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamWindowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinCondition,
+  joinType,
+  leftSchema,
+  rightSchema,
+  schema,
+  isRowTime,
+  leftLowerBound,
+  leftUpperBound,
+  remainCondition,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+joinToString(
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+joinExplainTerms(
+  super.explainTerms(pw),
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
+val isRightAppendOnly = UpdateCheckUtils.isAppendOnly(right)
+if (!isLeftAppendOnly || !isRightAppendOnly) {
+  throw new TableException(
+"Windowed stream join does not support updates.")
+}
+
+val leftDataStream = 
left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
+val rightDataStream = 
right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
+
+// get the equality keys and other condition
+val joinInfo = JoinInfo.of(leftNode, rightNode, joinCondition)
+val leftKeys = joinInfo.leftKeys.toIntArray
+val rightKeys = joinInfo.rightKeys.toIntArray
+
+// generate join function
+

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085366#comment-16085366
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r126683783
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/JoinITCase.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.mutable
+
+class JoinITCase extends StreamingWithStateTestBase {
+
+  val data = List(
+(1L, 1, "Hello"),
+(2L, 2, "Hello"),
+(3L, 3, "Hello"),
+(4L, 4, "Hello"),
+(5L, 5, "Hello"),
+(6L, 6, "Hello"),
+(7L, 7, "Hello World"),
+(8L, 8, "Hello World"),
+(20L, 20, "Hello World"))
+
+  /** test process time inner join **/
+  @Test
+  def testProcessTimeInnerJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+
+val sqlQuery = "SELECT t2.a, t2.c, t1.c from T1 as t1 join T2 as t2 on 
t1.a = t2.a and " +
+  "t1.proctime between t2.proctime - interval '5' second and 
t2.proctime + interval '5' second"
+
+val data1 = new mutable.MutableList[(Int, Long, String)]
+data1.+=((1, 1L, "Hi1"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 5L, "Hi3"))
+data1.+=((2, 7L, "Hi5"))
+data1.+=((1, 9L, "Hi6"))
+data1.+=((1, 8L, "Hi8"))
+
+val data2 = new mutable.MutableList[(Int, Long, String)]
+data2.+=((1, 1L, "HiHi"))
+data2.+=((2, 2L, "HeHe"))
+
+val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 
'proctime.proctime)
+val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 
'proctime.proctime)
+
+tEnv.registerTable("T1", t1)
+tEnv.registerTable("T2", t2)
+
+val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+  }
+
+  /** test process time inner join with other condition **/
+  @Test
+  def testProcessTimeInnerJoinWithOtherCondition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.testResults = mutable.MutableList()
--- End diff --

You can simply do `StreamITCase.clear` instead of this.


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {co

[GitHub] flink issue #4266: [FLINK-6232][Table&Sql] support proctime inner windowed s...

2017-07-13 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4266
  
Thanks for the review @wuchong.
I'll address your comments in my upcoming PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085387#comment-16085387
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4266
  
Thanks for the review @wuchong.
I'll address your comments in my upcoming PR.


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + 
> INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the 
> time condition only support bounded time range like {{o.proctime BETWEEN 
> s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not 
> support unbounded like {{o.proctime > s.protime}},  and  should include both 
> two stream's proctime attribute, {{o.proctime between proctime() and 
> proctime() + 1}} should also not be supported.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4241: [FLINK-7015] [streaming] separate OperatorConfig from Str...

2017-07-13 Thread XuPingyong
Github user XuPingyong commented on the issue:

https://github.com/apache/flink/pull/4241
  
Thanks @aljoscha and @StephanEwen, I have updated this PR according to your 
advice.

- Rename StreamConfig to StreamTaskConfig.

- Introduce serialisable OperatorConfig and move only those fields that are 
tied to one operator within the chain from StreamTaskConfig to OperatorConfig. 
Initialize the operator with an OperatorConfig.

- Introduce OperatorContext which is an interface that is a view on some 
things of OperatorConfig. It is provided for setting up an operator.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7015) Separate OperatorConfig from StreamConfig

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085393#comment-16085393
 ] 

ASF GitHub Bot commented on FLINK-7015:
---

Github user XuPingyong commented on the issue:

https://github.com/apache/flink/pull/4241
  
Thanks @aljoscha and @StephanEwen, I have updated this PR according to your 
advice.

- Rename StreamConfig to StreamTaskConfig.

- Introduce serialisable OperatorConfig and move only those fields that are 
tied to one operator within the chain from StreamTaskConfig to OperatorConfig. 
Initialize the operator with an OperatorConfig.

- Introduce OperatorContext which is an interface that is a view on some 
things of OperatorConfig. It is provided for setting up an operator.



> Separate OperatorConfig from StreamConfig
> -
>
> Key: FLINK-7015
> URL: https://issues.apache.org/jira/browse/FLINK-7015
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Xu Pingyong
>Assignee: Xu Pingyong
>
>  Motivation:
> A Task contains one or more operators with chainning, however 
> configs of operator and task are all put in StreamConfig. For example, when a 
> opeator sets up with the StreamConfig, it can see the interface about 
> physicalEdges or chained.task.configs that are confused.  Similarly a 
> streamTask should not see the interface aboule chain.index.
>  So we need to separate OperatorConfig from StreamConfig. A 
> streamTask builds execution enviroment with the streamConfig, and extract 
> operatorConfigs from it, then build streamOperators with every 
> operatorConfig. 
> 
>OperatorConfig:  for the streamOperator to setup with, it constains 
> informations that only belong to the streamOperator. It contains:
>1)  operator information: name, id
>2)  Serialized StreamOperator
>3)  input serializer.
>4)  output edges and serializers.
>5)  chain.index
>6) state.key.serializer
>  StreamConfig: for the streamTask to use:
>1) in.physical.edges
>   2) out.physical.edges
>3) chained OperatorConfigs
>4) execution environment: checkpoint, state.backend and so on... 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-13 Thread Yueting Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085404#comment-16085404
 ] 

Yueting Chen commented on FLINK-7169:
-

[~jark] Thanks.

I just found out that the events stored in sharedbuffer may have the same 
timestamp, which makes it impossible to determine which event should be 
discarded. I think we need to introduce a new variable to keep the logical 
order of the events.

I'll create a seperate JIRA to address this issue.

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

2017-07-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127165003
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, 
StreamTableEnvironment, TableException}
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, 
WindowJoinUtil}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.updateutils.UpdateCheckUtils
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamWindowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+joinCondition: RexNode,
+joinType: JoinRelType,
+leftSchema: RowSchema,
+rightSchema: RowSchema,
+schema: RowSchema,
+isRowTime: Boolean,
+leftLowerBound: Long,
+leftUpperBound: Long,
+remainCondition: Option[RexNode],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+with CommonJoin
+with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamWindowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinCondition,
+  joinType,
+  leftSchema,
+  rightSchema,
+  schema,
+  isRowTime,
+  leftLowerBound,
+  leftUpperBound,
+  remainCondition,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+joinToString(
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+joinExplainTerms(
+  super.explainTerms(pw),
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
--- End diff --

We should use `DataStreamRetractionRules.isAccRetract(input)` to check 
whether the input will produces updates.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085408#comment-16085408
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127165003
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, 
StreamTableEnvironment, TableException}
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, 
WindowJoinUtil}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.updateutils.UpdateCheckUtils
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamWindowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+joinCondition: RexNode,
+joinType: JoinRelType,
+leftSchema: RowSchema,
+rightSchema: RowSchema,
+schema: RowSchema,
+isRowTime: Boolean,
+leftLowerBound: Long,
+leftUpperBound: Long,
+remainCondition: Option[RexNode],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+with CommonJoin
+with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamWindowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinCondition,
+  joinType,
+  leftSchema,
+  rightSchema,
+  schema,
+  isRowTime,
+  leftLowerBound,
+  leftUpperBound,
+  remainCondition,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+joinToString(
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+joinExplainTerms(
+  super.explainTerms(pw),
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
--- End diff --

We should use `DataStreamRetractionRules.isAccRetract(input)` to check 
whether the input will produces updates.  


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> S

[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-13 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085409#comment-16085409
 ] 

Dawid Wysakowicz commented on FLINK-7169:
-

[~ychen] There is already such variable called counter.

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-13 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085409#comment-16085409
 ] 

Dawid Wysakowicz edited comment on FLINK-7169 at 7/13/17 9:07 AM:
--

[~ychen] There is already such variable called {{counter}}.


was (Author: dawidwys):
[~ychen] There is already such variable called counter.

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

2017-07-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127166606
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, 
StreamTableEnvironment, TableException}
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, 
WindowJoinUtil}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.updateutils.UpdateCheckUtils
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamWindowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+joinCondition: RexNode,
+joinType: JoinRelType,
+leftSchema: RowSchema,
+rightSchema: RowSchema,
+schema: RowSchema,
+isRowTime: Boolean,
+leftLowerBound: Long,
+leftUpperBound: Long,
+remainCondition: Option[RexNode],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+with CommonJoin
+with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamWindowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinCondition,
+  joinType,
+  leftSchema,
+  rightSchema,
+  schema,
+  isRowTime,
+  leftLowerBound,
+  leftUpperBound,
+  remainCondition,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+joinToString(
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+joinExplainTerms(
+  super.explainTerms(pw),
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
--- End diff --

`isAccRetract` only checks how updates are encoded but not whether there 
are updates.
The current approach is correct, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085417#comment-16085417
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127166606
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, 
StreamTableEnvironment, TableException}
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, 
WindowJoinUtil}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.updateutils.UpdateCheckUtils
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamWindowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+joinCondition: RexNode,
+joinType: JoinRelType,
+leftSchema: RowSchema,
+rightSchema: RowSchema,
+schema: RowSchema,
+isRowTime: Boolean,
+leftLowerBound: Long,
+leftUpperBound: Long,
+remainCondition: Option[RexNode],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+with CommonJoin
+with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamWindowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinCondition,
+  joinType,
+  leftSchema,
+  rightSchema,
+  schema,
+  isRowTime,
+  leftLowerBound,
+  leftUpperBound,
+  remainCondition,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+joinToString(
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+joinExplainTerms(
+  super.explainTerms(pw),
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
--- End diff --

`isAccRetract` only checks how updates are encoded but not whether there 
are updates.
The current approach is correct, IMO.


> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL: https://issues.apache.org/jira/browse/FLINK-6232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported

[jira] [Created] (FLINK-7170) Fix until condition when the contiguity is strict

2017-07-13 Thread Dian Fu (JIRA)
Dian Fu created FLINK-7170:
--

 Summary: Fix until condition when the contiguity is strict
 Key: FLINK-7170
 URL: https://issues.apache.org/jira/browse/FLINK-7170
 Project: Flink
  Issue Type: Bug
Reporter: Dian Fu
Assignee: Dian Fu


When the contiguity is {{STRICT}}, the method {{extendWithUntilCondition}} is 
not correct.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7170) Fix until condition when the contiguity is strict

2017-07-13 Thread Dian Fu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-7170:
---
Component/s: CEP

> Fix until condition when the contiguity is strict
> -
>
> Key: FLINK-7170
> URL: https://issues.apache.org/jira/browse/FLINK-7170
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> When the contiguity is {{STRICT}}, the method {{extendWithUntilCondition}} is 
> not correct.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4317: [FLINk-5987] [build] Upgrade zookeeper dependency ...

2017-07-13 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/4317

[FLINk-5987] [build] Upgrade zookeeper dependency to 3.4.10



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink 
flink-5987-Upgrade_zookeeper_dependency_to_3.4.10

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4317.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4317


commit 5098883ea6ecede9ff9095cde5a21e8062981862
Author: zhangminglei 
Date:   2017-07-13T09:20:06Z

[FLINk-5987] [build] Upgrade zookeeper dependency to 3.4.10




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4318: [FLINK-7170] [cep] Fix until condition when the co...

2017-07-13 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/4318

[FLINK-7170] [cep] Fix until condition when the contiguity is strict

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink fix-until-condition

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4318.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4318


commit 605311b8db8669ab1086eebc953892f90eb27ffa
Author: Dian Fu 
Date:   2017-07-13T09:21:30Z

[FLINK-7170] [cep] Fix until condition when the contiguity is strict




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7170) Fix until condition when the contiguity is strict

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085425#comment-16085425
 ] 

ASF GitHub Bot commented on FLINK-7170:
---

GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/4318

[FLINK-7170] [cep] Fix until condition when the contiguity is strict

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink fix-until-condition

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4318.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4318


commit 605311b8db8669ab1086eebc953892f90eb27ffa
Author: Dian Fu 
Date:   2017-07-13T09:21:30Z

[FLINK-7170] [cep] Fix until condition when the contiguity is strict




> Fix until condition when the contiguity is strict
> -
>
> Key: FLINK-7170
> URL: https://issues.apache.org/jira/browse/FLINK-7170
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> When the contiguity is {{STRICT}}, the method {{extendWithUntilCondition}} is 
> not correct.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4306: [FLINK-7034] Bump Dropwizard Metrics version to 3....

2017-07-13 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/4306


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5987) Upgrade zookeeper dependency to 3.4.10

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085433#comment-16085433
 ] 

ASF GitHub Bot commented on FLINK-5987:
---

Github user zhangminglei closed the pull request at:

https://github.com/apache/flink/pull/4317


> Upgrade zookeeper dependency to 3.4.10
> --
>
> Key: FLINK-5987
> URL: https://issues.apache.org/jira/browse/FLINK-5987
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>
> zookeeper 3.4.8 has been released.
> Among the fixes the following are desirable:
> ZOOKEEPER-706 large numbers of watches can cause session re-establishment to 
> fail 
> ZOOKEEPER-1797 PurgeTxnLog may delete data logs during roll
> This issue upgrades zookeeper dependency to 3.4.8



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7034) GraphiteReporter cannot recover from lost connection

2017-07-13 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-7034:
---

Assignee: Aljoscha Krettek

> GraphiteReporter cannot recover from lost connection
> 
>
> Key: FLINK-7034
> URL: https://issues.apache.org/jira/browse/FLINK-7034
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.3.0
>Reporter: Aleksandr
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2
>
>
> Now Flink uses metric version 1.3.0 in which there is a 
> [Bug|https://github.com/dropwizard/metrics/issues/694]. I think you should 
> use version 1.3.1 or higher



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7034) GraphiteReporter cannot recover from lost connection

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085432#comment-16085432
 ] 

ASF GitHub Bot commented on FLINK-7034:
---

Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/4306


> GraphiteReporter cannot recover from lost connection
> 
>
> Key: FLINK-7034
> URL: https://issues.apache.org/jira/browse/FLINK-7034
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.3.0
>Reporter: Aleksandr
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2
>
>
> Now Flink uses metric version 1.3.0 in which there is a 
> [Bug|https://github.com/dropwizard/metrics/issues/694]. I think you should 
> use version 1.3.1 or higher



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4317: [FLINK-5987] [build] Upgrade zookeeper dependency ...

2017-07-13 Thread zhangminglei
Github user zhangminglei closed the pull request at:

https://github.com/apache/flink/pull/4317


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-7034) GraphiteReporter cannot recover from lost connection

2017-07-13 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7034:

Fix Version/s: 1.3.2
   1.4.0

> GraphiteReporter cannot recover from lost connection
> 
>
> Key: FLINK-7034
> URL: https://issues.apache.org/jira/browse/FLINK-7034
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.3.0
>Reporter: Aleksandr
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2
>
>
> Now Flink uses metric version 1.3.0 in which there is a 
> [Bug|https://github.com/dropwizard/metrics/issues/694]. I think you should 
> use version 1.3.1 or higher



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7034) GraphiteReporter cannot recover from lost connection

2017-07-13 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7034.
---
Resolution: Fixed

Fixed on master in
b3b142283b7eec297eebbd18e2b534a78553b69c

Fixed on release-1.3 in
bf06171198e13299396a80cd777563f6e3185c02

> GraphiteReporter cannot recover from lost connection
> 
>
> Key: FLINK-7034
> URL: https://issues.apache.org/jira/browse/FLINK-7034
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.3.0
>Reporter: Aleksandr
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2
>
>
> Now Flink uses metric version 1.3.0 in which there is a 
> [Bug|https://github.com/dropwizard/metrics/issues/694]. I think you should 
> use version 1.3.1 or higher



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4306: [FLINK-7034] Bump Dropwizard Metrics version to 3.2.3

2017-07-13 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4306
  
Thanks for reviewing! 😃 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7034) GraphiteReporter cannot recover from lost connection

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085440#comment-16085440
 ] 

ASF GitHub Bot commented on FLINK-7034:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4306
  
Thanks for reviewing! 😃 



> GraphiteReporter cannot recover from lost connection
> 
>
> Key: FLINK-7034
> URL: https://issues.apache.org/jira/browse/FLINK-7034
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.3.0
>Reporter: Aleksandr
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2
>
>
> Now Flink uses metric version 1.3.0 in which there is a 
> [Bug|https://github.com/dropwizard/metrics/issues/694]. I think you should 
> use version 1.3.1 or higher



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4319: [FLINK-5987] [build] Upgrade zookeeper dependency ...

2017-07-13 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/4319

[FLINK-5987] [build] Upgrade zookeeper dependency to 3.4.10



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink 
flink-5987-Upgrade_zookeeper_dependency_to_3.4.10

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4319.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4319


commit 3bd3bdf9ef2ca47448513f4e354ecdd949c1a251
Author: zhangminglei 
Date:   2017-07-13T09:20:06Z

[FLINK-5987] [build] Upgrade zookeeper dependency to 3.4.10




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5987) Upgrade zookeeper dependency to 3.4.10

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085445#comment-16085445
 ] 

ASF GitHub Bot commented on FLINK-5987:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/4319

[FLINK-5987] [build] Upgrade zookeeper dependency to 3.4.10



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink 
flink-5987-Upgrade_zookeeper_dependency_to_3.4.10

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4319.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4319


commit 3bd3bdf9ef2ca47448513f4e354ecdd949c1a251
Author: zhangminglei 
Date:   2017-07-13T09:20:06Z

[FLINK-5987] [build] Upgrade zookeeper dependency to 3.4.10




> Upgrade zookeeper dependency to 3.4.10
> --
>
> Key: FLINK-5987
> URL: https://issues.apache.org/jira/browse/FLINK-5987
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>
> zookeeper 3.4.8 has been released.
> Among the fixes the following are desirable:
> ZOOKEEPER-706 large numbers of watches can cause session re-establishment to 
> fail 
> ZOOKEEPER-1797 PurgeTxnLog may delete data logs during roll
> This issue upgrades zookeeper dependency to 3.4.8



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

2017-07-13 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4296
  
@dawidwys Could you help to take a look at this PR? Thanks a lot in advance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7147) Support greedy quantifier in CEP

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085449#comment-16085449
 ] 

ASF GitHub Bot commented on FLINK-7147:
---

Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4296
  
@dawidwys Could you help to take a look at this PR? Thanks a lot in advance.


> Support greedy quantifier in CEP
> 
>
> Key: FLINK-7147
> URL: https://issues.apache.org/jira/browse/FLINK-7147
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Greedy quantifier will try to match the token as many times as possible. For 
> example, for pattern {{a b* c}} (skip till next is used) and inputs {{a b1 b2 
> c}}, if the quantifier for {{b}} is greedy, it will only output {{a b1 b2 c}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-13 Thread Yueting Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085451#comment-16085451
 ] 

Yueting Chen commented on FLINK-7169:
-

[~dawidwys]
Thanks for the information.
I noticed that. But the counter only keeps the reference count of the value, it 
has some limitations. For example:
We need to match a{2} from a1,a2,a3,a4, with the AFTER MATCH SKIP PAST LAST ROW 
option.
With no doubt, the first match is a1,a2. According to the SKIP option, we need 
to start the second match from a3.
But it's possible that all these four events have the same timestamp(1, for 
example). After the first match, we can only get the timestamp of the final 
matched event. Without knowing the exact logical order of the four events, it's 
impossible to skip to the right position. And the counter can't help with that.
What I suggest is to keep a logical id for each event, it does not need to be 
continouse, but it should be monotonically increasing.

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-5987) Upgrade zookeeper dependency to 3.4.10

2017-07-13 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-5987:
---

Assignee: mingleizhang

> Upgrade zookeeper dependency to 3.4.10
> --
>
> Key: FLINK-5987
> URL: https://issues.apache.org/jira/browse/FLINK-5987
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Assignee: mingleizhang
>
> zookeeper 3.4.8 has been released.
> Among the fixes the following are desirable:
> ZOOKEEPER-706 large numbers of watches can cause session re-establishment to 
> fail 
> ZOOKEEPER-1797 PurgeTxnLog may delete data logs during roll
> This issue upgrades zookeeper dependency to 3.4.8



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-13 Thread Yueting Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085451#comment-16085451
 ] 

Yueting Chen edited comment on FLINK-7169 at 7/13/17 9:38 AM:
--

[~dawidwys]
Thanks for the information.
I noticed that. But the counter only keeps the reference count of the value, it 
has some limitations. For example:
We need to match {{a{2}}} from {{a1,a2,a3,a4}}, with the AFTER MATCH SKIP PAST 
LAST ROW option.
With no doubt, the first match is {{a1,a2}}. According to the SKIP option, we 
need to start the second match from a3.
But it's possible that all these four events have the same timestamp(1, for 
example). After the first match, we can only get the timestamp of the final 
matched event. Without knowing the exact logical order of the four events, it's 
impossible to skip to the right position. And the counter can't help with that.
What I suggest is to keep a logical id for each event, it does not need to be 
continouse, but it should be monotonically increasing.


was (Author: ychen):
[~dawidwys]
Thanks for the information.
I noticed that. But the counter only keeps the reference count of the value, it 
has some limitations. For example:
We need to match a{2} from a1,a2,a3,a4, with the AFTER MATCH SKIP PAST LAST ROW 
option.
With no doubt, the first match is a1,a2. According to the SKIP option, we need 
to start the second match from a3.
But it's possible that all these four events have the same timestamp(1, for 
example). After the first match, we can only get the timestamp of the final 
matched event. Without knowing the exact logical order of the four events, it's 
impossible to skip to the right position. And the counter can't help with that.
What I suggest is to keep a logical id for each event, it does not need to be 
continouse, but it should be monotonically increasing.

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4311: [FLINK-7162] [test] Tests should not write outside 'targe...

2017-07-13 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4311
  
Thanks, zentol. I will fix it using ```TemporaryFolder``` soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7162) Tests should not write outside 'target' directory.

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085456#comment-16085456
 ] 

ASF GitHub Bot commented on FLINK-7162:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4311
  
Thanks, zentol. I will fix it using ```TemporaryFolder``` soon.


> Tests should not write outside 'target' directory.
> --
>
> Key: FLINK-7162
> URL: https://issues.apache.org/jira/browse/FLINK-7162
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: mingleizhang
>Assignee: mingleizhang
>
> A few tests use Files.createTempDir() from Guava package, but do not set 
> java.io.tmpdir system property. Thus the temp directory is created in 
> unpredictable places and is not being cleaned up by {{mvn clean}}.
> This was probably introduced in {{JobManagerStartupTest}} and then replicated 
> in {{BlobUtilsTest}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4149: [FLINK-6923] [Kafka Connector] Expose in-processing/in-fl...

2017-07-13 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4149
  
Just a quick comment: without a test or a very explicit comment about why 
these fields are there they might get "fixed away" in the future by someone who 
notices that there are unused fields.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP

2017-07-13 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/4296
  
@dianfu Sorry for the delay, but unfortunately I will not have enough time 
for a proper review before my vacation. I will get back to it after 24.07.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7147) Support greedy quantifier in CEP

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085458#comment-16085458
 ] 

ASF GitHub Bot commented on FLINK-7147:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/4296
  
@dianfu Sorry for the delay, but unfortunately I will not have enough time 
for a proper review before my vacation. I will get back to it after 24.07.


> Support greedy quantifier in CEP
> 
>
> Key: FLINK-7147
> URL: https://issues.apache.org/jira/browse/FLINK-7147
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Greedy quantifier will try to match the token as many times as possible. For 
> example, for pattern {{a b* c}} (skip till next is used) and inputs {{a b1 b2 
> c}}, if the quantifier for {{b}} is greedy, it will only output {{a b1 b2 c}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6923) Kafka connector needs to expose information about in-flight record in AbstractFetcher base class

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085459#comment-16085459
 ] 

ASF GitHub Bot commented on FLINK-6923:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4149
  
Just a quick comment: without a test or a very explicit comment about why 
these fields are there they might get "fixed away" in the future by someone who 
notices that there are unused fields.


> Kafka connector needs to expose information about in-flight record in 
> AbstractFetcher base class
> 
>
> Key: FLINK-6923
> URL: https://issues.apache.org/jira/browse/FLINK-6923
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Zhenzhong Xu
>Assignee: Zhenzhong Xu
>Priority: Minor
>
> We have a use case where we have our custom Fetcher implementation that 
> extends AbstractFetcher base class. We need to periodically get current in 
> flight (in processing) records' partition and offset information. 
> This can be easily exposed in AbstractFetcher class.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-13 Thread Yueting Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085451#comment-16085451
 ] 

Yueting Chen edited comment on FLINK-7169 at 7/13/17 9:43 AM:
--

[~dawidwys]
Thanks for the information.
I noticed that. But the counter only keeps the reference count of the value, it 
has some limitations. For example:
We need to match {{a\{2\}}} from {{a1,a2,a3,a4}}, with the AFTER MATCH SKIP 
PAST LAST ROW option.
With no doubt, the first match is {{a1,a2}}. According to the SKIP option, we 
need to start the second match from a3.
But it's possible that all these four events have the same timestamp(1, for 
example). After the first match, we can only get the timestamp of the final 
matched event. Without knowing the exact logical order of the four events, it's 
impossible to skip to the right position. And the counter can't help with that.
What I suggest is to keep a logical id for each event, it does not need to be 
continouse, but it should be monotonically increasing.


was (Author: ychen):
[~dawidwys]
Thanks for the information.
I noticed that. But the counter only keeps the reference count of the value, it 
has some limitations. For example:
We need to match {{a{2}}} from {{a1,a2,a3,a4}}, with the AFTER MATCH SKIP PAST 
LAST ROW option.
With no doubt, the first match is {{a1,a2}}. According to the SKIP option, we 
need to start the second match from a3.
But it's possible that all these four events have the same timestamp(1, for 
example). After the first match, we can only get the timestamp of the final 
matched event. Without knowing the exact logical order of the four events, it's 
impossible to skip to the right position. And the counter can't help with that.
What I suggest is to keep a logical id for each event, it does not need to be 
continouse, but it should be monotonically increasing.

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

2017-07-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127173843
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
 ---
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftLowerBound
+  *the left stream lower bound, and -leftLowerBound is the right 
stream upper bound
+  * @param leftUpperBound
+  *the left stream upper bound, and -leftUpperBound is the right 
stream lower bound
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param genJoinFuncNamethe function code of other non-equi condition
+  * @param genJoinFuncCodethe function name of other non-equi condition
+  *
+  */
+class ProcTimeWindowInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val element1Type: TypeInformation[Row],
+private val element2Type: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String)
+  extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]{
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  /** other condition function **/
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  /** tmp list to store expired records **/
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+  private val leftStreamWinSize: Long = if (leftLowerBound < 0) 
-leftLowerBound else 0
+  private val rightStreamWinSize: Long = if (leftUpperBound > 0) 
leftUpperBound else 0
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override def open(config: Configuration) {
+LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+  s"Code:\n$genJoinFuncCode")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+LOG.debug("Instantiating JoinFunction.")
+joinFunction = clazz.newInstance()
+
+listToRemove = new util.ArrayList[Long]()
+cRowWrapper = new CRowWrappingCollector()
+cRowWrapper.setChange(true)
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](element1Type)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asIns

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085469#comment-16085469
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127173843
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
 ---
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftLowerBound
+  *the left stream lower bound, and -leftLowerBound is the right 
stream upper bound
+  * @param leftUpperBound
+  *the left stream upper bound, and -leftUpperBound is the right 
stream lower bound
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param genJoinFuncNamethe function code of other non-equi condition
+  * @param genJoinFuncCodethe function name of other non-equi condition
+  *
+  */
+class ProcTimeWindowInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val element1Type: TypeInformation[Row],
+private val element2Type: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String)
+  extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]{
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  /** other condition function **/
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  /** tmp list to store expired records **/
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+  private val leftStreamWinSize: Long = if (leftLowerBound < 0) 
-leftLowerBound else 0
+  private val rightStreamWinSize: Long = if (leftUpperBound > 0) 
leftUpperBound else 0
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override def open(config: Configuration) {
+LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+  s"Code:\n$genJoinFuncCode")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+LOG.debug("Instantiating JoinFunction.")
+joinFunction = clazz.newInstance()
+
+listToRemove = new util.ArrayList[Long]()
+cRowWrapper = new CRowWrappingCollector()
+cRowWrapper.setChange(true)
+
+// initialize row state
+val rowListTypeInfo1:

[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

2017-07-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127173868
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, 
StreamTableEnvironment, TableException}
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, 
WindowJoinUtil}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.updateutils.UpdateCheckUtils
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamWindowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+joinCondition: RexNode,
+joinType: JoinRelType,
+leftSchema: RowSchema,
+rightSchema: RowSchema,
+schema: RowSchema,
+isRowTime: Boolean,
+leftLowerBound: Long,
+leftUpperBound: Long,
+remainCondition: Option[RexNode],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+with CommonJoin
+with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamWindowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinCondition,
+  joinType,
+  leftSchema,
+  rightSchema,
+  schema,
+  isRowTime,
+  leftLowerBound,
+  leftUpperBound,
+  remainCondition,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+joinToString(
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+joinExplainTerms(
+  super.explainTerms(pw),
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
--- End diff --

The following SQL `select a, sum(b), a+1 from t1 group by a` will optimized 
into the following nodes:

```
DataStreamCalc (AccRetract,  producesUpdates=false)
DataStreamGroupAggregate (AccRetract, producesUpdates=true)
   DataStreamScan (Acc, producesUpdates=fale)
```
The DataStreamCalc is append only, but is in AccRetract mode which means 
the output contains retraction. 

I think we want to check whether the input contains retraction, right? 





---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085470#comment-16085470
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127173868
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, 
StreamTableEnvironment, TableException}
+import org.apache.flink.table.plan.nodes.CommonJoin
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.join.{ProcTimeWindowInnerJoin, 
WindowJoinUtil}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.updateutils.UpdateCheckUtils
+
+/**
+  * Flink RelNode which matches along with JoinOperator and its related 
operations.
+  */
+class DataStreamWindowJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+joinCondition: RexNode,
+joinType: JoinRelType,
+leftSchema: RowSchema,
+rightSchema: RowSchema,
+schema: RowSchema,
+isRowTime: Boolean,
+leftLowerBound: Long,
+leftUpperBound: Long,
+remainCondition: Option[RexNode],
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+with CommonJoin
+with DataStreamRel {
+
+  override def deriveRowType() = schema.logicalType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamWindowJoin(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  joinCondition,
+  joinType,
+  leftSchema,
+  rightSchema,
+  schema,
+  isRowTime,
+  leftLowerBound,
+  leftUpperBound,
+  remainCondition,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+joinToString(
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+joinExplainTerms(
+  super.explainTerms(pw),
+  schema.logicalType,
+  joinCondition,
+  joinType,
+  getExpressionString)
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val config = tableEnv.getConfig
+
+val isLeftAppendOnly = UpdateCheckUtils.isAppendOnly(left)
--- End diff --

The following SQL `select a, sum(b), a+1 from t1 group by a` will optimized 
into the following nodes:

```
DataStreamCalc (AccRetract,  producesUpdates=false)
DataStreamGroupAggregate (AccRetract, producesUpdates=true)
   DataStreamScan (Acc, producesUpdates=fale)
```
The DataStreamCalc is append only, but is in AccRetract mode which means 
the output contains retraction. 

I think we want to check whether the input contains retraction, right? 





> Support proctime inner equi-join between two streams in the SQL API
> ---
>
> Key: FLINK-6232
> URL

[GitHub] flink issue #4305: [FLINK-7161] fix misusage of Float.MIN_VALUE and Double.M...

2017-07-13 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4305
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-6902) Activate strict checkstyle for flink-streaming-scala

2017-07-13 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-6902.
---
Resolution: Fixed

1.4: 69c8b444e7579dfe6752fa17b4fc1eabe33c3c09

> Activate strict checkstyle for flink-streaming-scala
> 
>
> Key: FLINK-6902
> URL: https://issues.apache.org/jira/browse/FLINK-6902
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scala API
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7161) Fix misusage of Double.MIN_VALUE and Float.MIN_VALUE

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085477#comment-16085477
 ] 

ASF GitHub Bot commented on FLINK-7161:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4305
  
+1


> Fix misusage of Double.MIN_VALUE and Float.MIN_VALUE
> 
>
> Key: FLINK-7161
> URL: https://issues.apache.org/jira/browse/FLINK-7161
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.1
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> In FloatSummaryAggregator and DoubleSummaryAggregator, we used 
> Float.MIN_VALUE and Double.MIN_VALUE to be the initial values when doing max 
> aggregation which is wrong.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4112: [FLINK-6901] Flip checkstyle configuration files

2017-07-13 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/4112


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6901) Flip checkstyle configuration files

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085478#comment-16085478
 ] 

ASF GitHub Bot commented on FLINK-6901:
---

Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/4112


> Flip checkstyle configuration files
> ---
>
> Key: FLINK-6901
> URL: https://issues.apache.org/jira/browse/FLINK-6901
> Project: Flink
>  Issue Type: Improvement
>  Components: Checkstyle
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>
> We currently have 2 checkstyle files, {{checkstyle.xml}} as the basic 
> version, and {{strict-checkstyle.xml}} as a heavily expanded version that is 
> applied to most existing modules (see FLINK-6698).
> [~greghogan] suggested to flip the checkstyle while reviewing this PR 
> https://github.com/apache/flink/pull/4086, given that the strict checkstyle 
> is supposed to subsume the existing checkstyle in the long-term.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-6901) Flip checkstyle configuration files

2017-07-13 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-6901.
---
Resolution: Fixed

1.4: bf0846fc61d38d56a6d0bd690dff64f8ac0a7082

> Flip checkstyle configuration files
> ---
>
> Key: FLINK-6901
> URL: https://issues.apache.org/jira/browse/FLINK-6901
> Project: Flink
>  Issue Type: Improvement
>  Components: Checkstyle
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>
> We currently have 2 checkstyle files, {{checkstyle.xml}} as the basic 
> version, and {{strict-checkstyle.xml}} as a heavily expanded version that is 
> applied to most existing modules (see FLINK-6698).
> [~greghogan] suggested to flip the checkstyle while reviewing this PR 
> https://github.com/apache/flink/pull/4086, given that the strict checkstyle 
> is supposed to subsume the existing checkstyle in the long-term.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4266: [FLINK-6232][Table&Sql] support proctime inner win...

2017-07-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127176219
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
 ---
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftLowerBound
+  *the left stream lower bound, and -leftLowerBound is the right 
stream upper bound
+  * @param leftUpperBound
+  *the left stream upper bound, and -leftUpperBound is the right 
stream lower bound
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param genJoinFuncNamethe function code of other non-equi condition
+  * @param genJoinFuncCodethe function name of other non-equi condition
+  *
+  */
+class ProcTimeWindowInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val element1Type: TypeInformation[Row],
+private val element2Type: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String)
+  extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]{
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  /** other condition function **/
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  /** tmp list to store expired records **/
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+  private val leftStreamWinSize: Long = if (leftLowerBound < 0) 
-leftLowerBound else 0
+  private val rightStreamWinSize: Long = if (leftUpperBound > 0) 
leftUpperBound else 0
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override def open(config: Configuration) {
+LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+  s"Code:\n$genJoinFuncCode")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+LOG.debug("Instantiating JoinFunction.")
+joinFunction = clazz.newInstance()
+
+listToRemove = new util.ArrayList[Long]()
+cRowWrapper = new CRowWrappingCollector()
+cRowWrapper.setChange(true)
+
+// initialize row state
+val rowListTypeInfo1: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](element1Type)
+val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("row1mapstate",
+BasicTypeInfo.LONG_TYPE_INFO.asIns

[jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085483#comment-16085483
 ] 

ASF GitHub Bot commented on FLINK-6232:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4266#discussion_r127176219
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala
 ---
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+/**
+  * A CoProcessFunction to support stream join stream, currently just 
support inner-join
+  *
+  * @param leftLowerBound
+  *the left stream lower bound, and -leftLowerBound is the right 
stream upper bound
+  * @param leftUpperBound
+  *the left stream upper bound, and -leftUpperBound is the right 
stream lower bound
+  * @param element1Type  the input type of left stream
+  * @param element2Type  the input type of right stream
+  * @param genJoinFuncNamethe function code of other non-equi condition
+  * @param genJoinFuncCodethe function name of other non-equi condition
+  *
+  */
+class ProcTimeWindowInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val element1Type: TypeInformation[Row],
+private val element2Type: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String)
+  extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]{
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  /** other condition function **/
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  /** tmp list to store expired records **/
+  private var listToRemove: JList[Long] = _
+
+  /** state to hold left stream element **/
+  private var row1MapState: MapState[Long, JList[Row]] = _
+  /** state to hold right stream element **/
+  private var row2MapState: MapState[Long, JList[Row]] = _
+
+  /** state to record last timer of left stream, 0 means no timer **/
+  private var timerState1: ValueState[Long] = _
+  /** state to record last timer of right stream, 0 means no timer **/
+  private var timerState2: ValueState[Long] = _
+
+  private val leftStreamWinSize: Long = if (leftLowerBound < 0) 
-leftLowerBound else 0
+  private val rightStreamWinSize: Long = if (leftUpperBound > 0) 
leftUpperBound else 0
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override def open(config: Configuration) {
+LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " +
+  s"Code:\n$genJoinFuncCode")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genJoinFuncName,
+  genJoinFuncCode)
+LOG.debug("Instantiating JoinFunction.")
+joinFunction = clazz.newInstance()
+
+listToRemove = new util.ArrayList[Long]()
+cRowWrapper = new CRowWrappingCollector()
+cRowWrapper.setChange(true)
+
+// initialize row state
+val rowListTypeInfo1:

[GitHub] flink pull request #4086: [FLINK-6865] Update checkstyle documentation

2017-07-13 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/4086


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-6865) Expand checkstyle docs to include import in intellij

2017-07-13 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-6865.
---
Resolution: Fixed

1.4: 05beca703d005e2d9db0c441a6d01fa6aa3993b7

> Expand checkstyle docs to include import in intellij
> 
>
> Key: FLINK-6865
> URL: https://issues.apache.org/jira/browse/FLINK-6865
> Project: Flink
>  Issue Type: Improvement
>  Components: Checkstyle
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6865) Expand checkstyle docs to include import in intellij

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085487#comment-16085487
 ] 

ASF GitHub Bot commented on FLINK-6865:
---

Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/4086


> Expand checkstyle docs to include import in intellij
> 
>
> Key: FLINK-6865
> URL: https://issues.apache.org/jira/browse/FLINK-6865
> Project: Flink
>  Issue Type: Improvement
>  Components: Checkstyle
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7171) Remove identity project for time attributes

2017-07-13 Thread Timo Walther (JIRA)
Timo Walther created FLINK-7171:
---

 Summary: Remove identity project for time attributes
 Key: FLINK-7171
 URL: https://issues.apache.org/jira/browse/FLINK-7171
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Timo Walther


If only time attributes are projected away, the translated plan should not 
contain an additional Calc node.

Example:
{code}
streamUtil.addTable[(Int, String, Long)](
"MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
{code}

and 

{code}
streamUtil.addTable[(Int, String, Long)](
"MyTable", 'a, 'b, 'c)
{code}

Lead to different logical plans even if these attributes are not accessed in 
{{"SELECT DISTINCT a, b, c FROM MyTable"}}.

{code}
  unaryNode(
"DataStreamGroupAggregate",
unaryNode(
  "DataStreamCalc",
  streamTableNode(0),
  term("select", "a, b, c")
),
term("groupBy", "a, b, c"),
term("select", "a, b, c")
  )
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-6695) Activate strict checkstyle in flink-contrib

2017-07-13 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-6695.
---
Resolution: Fixed

> Activate strict checkstyle in flink-contrib
> ---
>
> Key: FLINK-6695
> URL: https://issues.apache.org/jira/browse/FLINK-6695
> Project: Flink
>  Issue Type: Sub-task
>  Components: flink-contrib
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6244) Emit timeouted Patterns as Side Output

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16085490#comment-16085490
 ] 

ASF GitHub Bot commented on FLINK-6244:
---

GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/4320

[FLINK-6244] Emit timeouted Patterns as Side Output

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink side-outputs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4320.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4320


commit 2ae39185facf70ca4e394504b4ac5dc8562570a3
Author: Dawid Wysakowicz 
Date:   2017-07-11T14:52:15Z

[FLINK-6244] Emit timeouted Patterns as Side Output




> Emit timeouted Patterns as Side Output
> --
>
> Key: FLINK-6244
> URL: https://issues.apache.org/jira/browse/FLINK-6244
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
> Fix For: 1.4.0
>
>
> Now that we have SideOuputs I think timeouted patterns should be emitted into 
> them rather than producing a stream of `Either`



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4320: [FLINK-6244] Emit timeouted Patterns as Side Outpu...

2017-07-13 Thread dawidwys
GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/4320

[FLINK-6244] Emit timeouted Patterns as Side Output

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink side-outputs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4320.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4320


commit 2ae39185facf70ca4e394504b4ac5dc8562570a3
Author: Dawid Wysakowicz 
Date:   2017-07-11T14:52:15Z

[FLINK-6244] Emit timeouted Patterns as Side Output




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-7171) Remove identity project for time attributes

2017-07-13 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-7171:

Affects Version/s: 1.3.1

> Remove identity project for time attributes
> ---
>
> Key: FLINK-7171
> URL: https://issues.apache.org/jira/browse/FLINK-7171
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Timo Walther
>
> If only time attributes are projected away, the translated plan should not 
> contain an additional Calc node.
> Example:
> {code}
> streamUtil.addTable[(Int, String, Long)](
> "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
> {code}
> and 
> {code}
> streamUtil.addTable[(Int, String, Long)](
> "MyTable", 'a, 'b, 'c)
> {code}
> Lead to different logical plans even if these attributes are not accessed in 
> {{"SELECT DISTINCT a, b, c FROM MyTable"}}.
> {code}
>   unaryNode(
> "DataStreamGroupAggregate",
> unaryNode(
>   "DataStreamCalc",
>   streamTableNode(0),
>   term("select", "a, b, c")
> ),
> term("groupBy", "a, b, c"),
> term("select", "a, b, c")
>   )
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4320: [FLINK-6244] Emit timeouted Patterns as Side Output

2017-07-13 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/4320
  
R: @kl0u 

@aljoscha I think it would be nice if you also had a look, as you spotted 
some problems with first approach to side-ouputs in CEP.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >