[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API

2016-11-24 Thread Alexander Shoshin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15695133#comment-15695133
 ] 

Alexander Shoshin commented on FLINK-3133:
--

[~mxm], hi.
Can you answer my previous question please? It seems that you missed it )

> Introduce collect()/count()/print() methods in DataStream API
> -
>
> Key: FLINK-3133
> URL: https://issues.apache.org/jira/browse/FLINK-3133
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Streaming
>Affects Versions: 0.10.0, 1.0.0, 0.10.1
>Reporter: Maximilian Michels
>Assignee: Alexander Shoshin
> Fix For: 1.0.0
>
>
> The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should 
> be mirrored to the DataStream API. 
> The semantics of the calls are different. We need to be able to sample parts 
> of a stream, e.g. by supplying a time period in the arguments to the methods. 
> Users should use the {{JobClient}} to retrieve the results.
> {code:java}
> StreamExecutionEnvironment env = 
> StramEnvironment.getStreamExecutionEnvironment();
> DataStream streamData = env.addSource(..).map(..);
> JobClient jobClient = env.executeWithControl();
> Iterable sampled = jobClient.sampleStream(streamData, 
> Time.seconds(5));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5002) Lack of synchronization in LocalBufferPool#getNumberOfUsedBuffers

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15695114#comment-15695114
 ] 

ASF GitHub Bot commented on FLINK-5002:
---

GitHub user MayerRoman opened a pull request:

https://github.com/apache/flink/pull/2865

[FLINK-5002] Renamed getNumberOfUsedBuffers() method to 
bestEffortGetNumOfUsedBuffers(), add a test to check that it does not return a 
negative value

[FLINK-5002] Lack of synchronization in 
LocalBufferPool#getNumberOfUsedBuffers

According to Stefan proposal, I renamed the method and added test to make 
sure that the method does not return a negative value.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MayerRoman/flink FLINK_5002

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2865.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2865


commit 5b7a17f54d37ac028335343742ba7021e047ca64
Author: Roman Maier 
Date:   2016-11-18T13:51:58Z

[FLINK-5002] Renamed getNumberOfUsedBuffers() method to 
bestEffortGetNumOfUsedBuffers(), add a test to check that it does not return a 
negative value.




> Lack of synchronization in LocalBufferPool#getNumberOfUsedBuffers
> -
>
> Key: FLINK-5002
> URL: https://issues.apache.org/jira/browse/FLINK-5002
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Roman Maier
>Priority: Minor
>  Labels: easyfix, starter
>
> {code}
>   public int getNumberOfUsedBuffers() {
> return numberOfRequestedMemorySegments - availableMemorySegments.size();
>   }
> {code}
> Access to availableMemorySegments should be protected with proper 
> synchronization as other methods do.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2865: [FLINK-5002] Renamed getNumberOfUsedBuffers() meth...

2016-11-24 Thread MayerRoman
GitHub user MayerRoman opened a pull request:

https://github.com/apache/flink/pull/2865

[FLINK-5002] Renamed getNumberOfUsedBuffers() method to 
bestEffortGetNumOfUsedBuffers(), add a test to check that it does not return a 
negative value

[FLINK-5002] Lack of synchronization in 
LocalBufferPool#getNumberOfUsedBuffers

According to Stefan proposal, I renamed the method and added test to make 
sure that the method does not return a negative value.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MayerRoman/flink FLINK_5002

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2865.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2865


commit 5b7a17f54d37ac028335343742ba7021e047ca64
Author: Roman Maier 
Date:   2016-11-18T13:51:58Z

[FLINK-5002] Renamed getNumberOfUsedBuffers() method to 
bestEffortGetNumOfUsedBuffers(), add a test to check that it does not return a 
negative value.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-1536) Graph partitioning operators for Gelly

2016-11-24 Thread Ivan Mushketyk (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Mushketyk reassigned FLINK-1536:
-

Assignee: Ivan Mushketyk

> Graph partitioning operators for Gelly
> --
>
> Key: FLINK-1536
> URL: https://issues.apache.org/jira/browse/FLINK-1536
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Smart graph partitioning can significantly improve the performance and 
> scalability of graph analysis applications. Depending on the computation 
> pattern, a graph partitioning algorithm divides the graph into (maybe 
> overlapping) subgraphs, optimizing some objective. For example, if 
> communication is performed across graph edges, one might want to minimize the 
> edges that cross from one partition to another.
> The problem of graph partitioning is a well studied problem and several 
> algorithms have been proposed in the literature. The goal of this project 
> would be to choose a few existing partitioning techniques and implement the 
> corresponding graph partitioning operators for Gelly.
> Some related literature can be found [here| 
> http://www.citeulike.org/user/vasiakalavri/tag/graph-partitioning].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4649) Implement bipartite graph metrics

2016-11-24 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15694318#comment-15694318
 ] 

Ivan Mushketyk commented on FLINK-4649:
---

Implemented here: https://github.com/mushketyk/flink/tree/bipartite-metrics
Blocked by this PR: https://github.com/apache/flink/pull/2564

> Implement bipartite graph metrics
> -
>
> Key: FLINK-4649
> URL: https://issues.apache.org/jira/browse/FLINK-4649
> Project: Flink
>  Issue Type: Sub-task
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>
> Implement metrics calculation for a bipartite graph. Should be similar to 
> EdgeMetrics and VertexMetrics. 
> Paper that describes bipartite graph metrics: 
> http://jponnela.com/web_documents/twomode.pdf 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-24 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi resolved FLINK-3702.
---
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed via 1f04542 and 870e219.

> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
> Fix For: 1.2.0
>
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15694237#comment-15694237
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2094


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2094: [FLINK-3702] Make FieldAccessors support nested fi...

2016-11-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2094


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2804: [FLINK-5067] Make Flink compile with 1.8 Java compiler

2016-11-24 Thread melentye
Github user melentye commented on the issue:

https://github.com/apache/flink/pull/2804
  
@StephanEwen it seems that the semantics is different when java.version is 
just defined in pom.xml versus being overriden with -Djava.version=x.y.z 
argument passed to mvn.

org.apache.hadoop.hbase.util.ClassSize expects a very specific format of 
java.version system property. But when java.version is only defined in pom.xml 
then it doesn't actually become a system property unlike in case of using 
-Djava.version


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5067) Make Flink compile with 1.8 Java compiler

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693968#comment-15693968
 ] 

ASF GitHub Bot commented on FLINK-5067:
---

Github user melentye commented on the issue:

https://github.com/apache/flink/pull/2804
  
@StephanEwen it seems that the semantics is different when java.version is 
just defined in pom.xml versus being overriden with -Djava.version=x.y.z 
argument passed to mvn.

org.apache.hadoop.hbase.util.ClassSize expects a very specific format of 
java.version system property. But when java.version is only defined in pom.xml 
then it doesn't actually become a system property unlike in case of using 
-Djava.version


> Make Flink compile with 1.8 Java compiler
> -
>
> Key: FLINK-5067
> URL: https://issues.apache.org/jira/browse/FLINK-5067
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.2.0
> Environment: macOS Sierra 10.12.1, java version "1.8.0_112", Apache 
> Maven 3.3.9
>Reporter: Andrey Melentyev
>Priority: Minor
>
> Flink fails to compile when using 1.8 as source and target in Maven. There 
> are two types of issue that are both related to the new type inference rules:
> * Call to TypeSerializer.copy method in TupleSerializer.java:112 now resolves 
> to a different overload than before causing a compilation error: [ERROR] 
> /Users/andrey.melentyev/Dev/github.com/apache/flink/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java:[112,63]
>  incompatible types: void cannot be converted to java.lang.Object
> * A number of unit tests using assertEquals fail to compile:
> [ERROR] 
> /Users/andrey.melentyev/Dev/github.com/apache/flink/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java:[50,25]
>  reference to assertEquals is ambiguous
> [ERROR] both method assertEquals(long,long) in org.junit.Assert and method 
> assertEquals(java.lang.Object,java.lang.Object) in org.junit.Assert match
> In both of the above scenarios explicitly casting one of the arguments helps 
> the compiler to resolve overloaded method call correctly.
> It is possible to maintain Flink's code base in a state when it can be built 
> by both 1.7 and 1.8. For this purpose we need minor code fixes and an 
> automated build in Travis to keep the new good state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4913) Per-job Yarn clusters: include user jar in system class loader

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693942#comment-15693942
 ] 

ASF GitHub Bot commented on FLINK-4913:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2795
  
Rebased and pushed to travis again: 
https://travis-ci.org/rmetzger/flink/builds/178664750 


> Per-job Yarn clusters: include user jar in system class loader 
> ---
>
> Key: FLINK-4913
> URL: https://issues.apache.org/jira/browse/FLINK-4913
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, YARN Client
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.2.0, 1.1.4
>
>
> Including the jar directly in the system classloader avoids loading it for 
> every instantiation of the ExecutionGraph and every Task execution. Note, 
> this is only possible for per-job clusters (i.e. Yarn/Mesos).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2795: Revert "[FLINK-4913][yarn] include user jars in system cl...

2016-11-24 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2795
  
Rebased and pushed to travis again: 
https://travis-ci.org/rmetzger/flink/builds/178664750 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2856: Removed excessive tests.

2016-11-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2856


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5055) Security feature crashes JM for certain Hadoop versions even though using no Kerberos

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693816#comment-15693816
 ] 

ASF GitHub Bot commented on FLINK-5055:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2864

[FLINK-5055][security] skip Hadoop UGI login if unsecured

The new Kerberos authentication code in Flink assumed that it's running 
against vanilla Hadoop. Original Hadoop's behavior is to skip a secure login if 
security is not configured. This is different for other distributions, e.g. the 
MapR Hadoop distribution of Hadoop.

Thus, we need to make sure we don't perform any login action if security is 
not configured.

This also performs minor code cleanup.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-5055

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2864.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2864


commit 8193024a6451dd2594348ac0f001ed39b80f7302
Author: Maximilian Michels 
Date:   2016-11-24T16:12:39Z

[FLINK-5055][security] skip Hadoop UGI login if unsecured

The new Kerberos authentication code in Flink assumed that it's running
against vanilla Hadoop. Original Hadoop's behavior is to skip a secure
login if security is not configured. This is different for other
distributions, e.g. the MapR Hadoop distribution of Hadoop.

Thus, we need to make sure we don't perform any login action if security
is not configured.

This also performs minor code cleanup.




> Security feature crashes JM for certain Hadoop versions even though using no 
> Kerberos
> -
>
> Key: FLINK-5055
> URL: https://issues.apache.org/jira/browse/FLINK-5055
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Maximilian Michels
>Priority: Critical
> Fix For: 1.2.0
>
>
> A user reported [1] that the {{JobManager}} does not start when using Flink 
> with Hadoop-2.7.0-mapr-1607 and no security activated because of 
> {code}
> javax.security.auth.login.LoginException: Unable to obtain Principal Name for 
> authentication
> at 
> com.sun.security.auth.module.Krb5LoginModule.promptForName(Krb5LoginModule.java:841)
> at 
> com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:704)
> at 
> com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
> {code}
> It seems that this Hadoop version always tries to login via Kerberos even 
> though the user did not activate it and, thus, should use 
> {{AuthenticationMode.SIMPLE}}.
> I'm not really familiar with the security feature, but my understanding is 
> that it should not have any effect on Flink when not activated. I might be 
> wrong here, but if not, then we should fix this problem for 1.2.0 because it 
> prevents people from using Flink.
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-using-Yarn-on-MapR-td14484.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2864: [FLINK-5055][security] skip Hadoop UGI login if un...

2016-11-24 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2864

[FLINK-5055][security] skip Hadoop UGI login if unsecured

The new Kerberos authentication code in Flink assumed that it's running 
against vanilla Hadoop. Original Hadoop's behavior is to skip a secure login if 
security is not configured. This is different for other distributions, e.g. the 
MapR Hadoop distribution of Hadoop.

Thus, we need to make sure we don't perform any login action if security is 
not configured.

This also performs minor code cleanup.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-5055

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2864.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2864


commit 8193024a6451dd2594348ac0f001ed39b80f7302
Author: Maximilian Michels 
Date:   2016-11-24T16:12:39Z

[FLINK-5055][security] skip Hadoop UGI login if unsecured

The new Kerberos authentication code in Flink assumed that it's running
against vanilla Hadoop. Original Hadoop's behavior is to skip a secure
login if security is not configured. This is different for other
distributions, e.g. the MapR Hadoop distribution of Hadoop.

Thus, we need to make sure we don't perform any login action if security
is not configured.

This also performs minor code cleanup.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693706#comment-15693706
 ] 

ASF GitHub Bot commented on FLINK-4541:
---

Github user AlexanderShoshin commented on the issue:

https://github.com/apache/flink/pull/2811
  
Sure,
I will do it tomorrow :)


> Support for SQL NOT IN operator
> ---
>
> Key: FLINK-4541
> URL: https://issues.apache.org/jira/browse/FLINK-4541
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Alexander Shoshin
>
> This should work:
> {code}
> def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> // register the DataSet as table "WordCount"
> tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
> tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, 
> 'frequency).select('word).filter('word !== "hello"))
> // run a SQL query on the Table and retrieve the result as a new Table
> val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE 
> word NOT IN (SELECT word FROM WordCount2) GROUP BY word")
> table.toDataSet[WC].print()
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread AlexanderShoshin
Github user AlexanderShoshin commented on the issue:

https://github.com/apache/flink/pull/2811
  
Sure,
I will do it tomorrow :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5051) Backwards compatibility for serializers in backend state

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693681#comment-15693681
 ] 

ASF GitHub Bot commented on FLINK-5051:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2863
  
R @aljoscha and whoever is interested.


> Backwards compatibility for serializers in backend state
> 
>
> Key: FLINK-5051
> URL: https://issues.apache.org/jira/browse/FLINK-5051
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>
> When a new state is register, e.g. in a keyed backend via 
> `getPartitionedState`, the caller has to provide all type serializers 
> required for the persistence of state components. Explicitly passing the 
> serializers on state creation already allows for potentiall version upgrades 
> of serializers.
> However, those serializers are currently not part of any snapshot and are 
> only provided at runtime, when the state is registered newly or restored. For 
> backwards compatibility, this has strong implications: checkpoints are not 
> self contained in that state is currently a blackbox without knowledge about 
> it's corresponding serializers. Most cases where we would need to restructure 
> the state are basically lost. We could only convert them lazily at runtime 
> and only once the user is registering the concrete state, which might happen 
> at unpredictable points.
> I suggest to adapt our solution as follows:
> - As now, all states are registered with their set of serializers.
> - Unlike now, all serializers are written to the snapshot. This makes 
> savepoints self-contained and also allows to create inspection tools for 
> savepoints at some point in the future.
> - Introduce an interface {{Versioned}} with {{long getVersion()}} and 
> {{boolean isCompatible(Versioned v)}} which is then implemented by 
> serializers. Compatible serializers must ensure that they can deserialize 
> older versions, and can then serialize them in their new format. This is how 
> we upgrade.
> We need to find the right tradeoff in how many places we need to store the 
> serializers. I suggest to write them once per parallel operator instance for 
> each state, i.e. we have a map with state_name -> tuple3, 
> serializer, serializer>. This could go before all 
> key-groups are written, right at the head of the file. Then, for each file we 
> see on restore, we can first read the serializer map from the head of the 
> stream, then go through the key groups by offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2863: [FLINK-5051] Backwards compatibility for serializers in b...

2016-11-24 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2863
  
R @aljoscha and whoever is interested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5051) Backwards compatibility for serializers in backend state

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693679#comment-15693679
 ] 

ASF GitHub Bot commented on FLINK-5051:
---

GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/2863

[FLINK-5051] Backwards compatibility for serializers in backend state

This PR sits on top of PR #2781 and introduces backwards compatibility for 
state serializers in keyed backends. We do so by providing version 
compatibility checking for ``TypeSerializer`` and making the serializers 
mandatory part of a keyed backend's meta data in checkpoints (so that we have 
everything required to reconstruct states in a self contained way). A 
serialization proxy is introduced for keyed backend state. Currently this 
serialization proxy covers the meta data, not yet the actual data. As the PR 
essentially moves functionality to a different place, it is already covered by 
existing tests.

Notice: we should introduce a similar approach for 
``OperatorStateBackend``s.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink 
serializer-backwards-compatibility

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2863.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2863


commit a373585c2fe71b467f49f0e295dc647b43ab7a9c
Author: Stefan Richter 
Date:   2016-11-01T11:29:01Z

Backwards compatibility 1.1 -> 1.2

commit 8e4e4bcede50e66a95928ec854e51d45a7df28bf
Author: Stefan Richter 
Date:   2016-11-09T13:54:35Z

Removing some unecessary code from migration classes

commit 78bd66fade7f836eafbab978329caf1ea26f2ffc
Author: Stefan Richter 
Date:   2016-11-09T17:21:13Z

MultiStreamStateHandle

commit a9355679c3476dd890b54312e1696b61c7839873
Author: Stefan Richter 
Date:   2016-11-10T13:18:55Z

Added migration unit test

commit d079bd4bdb762c307a3c5cd084590804b90996b1
Author: Stefan Richter 
Date:   2016-11-10T13:45:58Z

rebase fixes

commit 9f47bac9c25fc33993c3942a57462039cc578dcd
Author: Stefan Richter 
Date:   2016-11-11T13:46:39Z

Minor cleanups: deleting more unnecessary classes

commit 2bbe66386d28c7914c62e2c3829ff3ab6840164c
Author: Stefan Richter 
Date:   2016-11-23T13:15:33Z

Versioned serialization

commit 6460e27717ab208aada988ba2c83d5628b31b310
Author: Stefan Richter 
Date:   2016-11-23T17:59:45Z

Common meta info introduced to keyed backends

commit e7d66377730339523bad8e3e6e75865ea5a29a6b
Author: Stefan Richter 
Date:   2016-11-23T21:40:26Z

Introducing isCompatibleWith to TypeSerializers

commit 89e3779d231fd0dadb01782791c92ec8ebb15a81
Author: Stefan Richter 
Date:   2016-11-23T22:33:42Z

Splitting / Introducing interface for versiond and compatibile

commit 6714a7efd3d839befda7a9b744311494e4ecb714
Author: Stefan Richter 
Date:   2016-11-24T10:59:01Z

Cleanup and documentation

commit 6df300f7f5a7d7b38b00ecd6636ecd53bc15d370
Author: Stefan Richter 
Date:   2016-11-24T11:18:43Z

Cleanup and documentation

commit b22273455d8f9282af715e502244811543e3fb99
Author: Stefan Richter 
Date:   2016-11-24T16:19:51Z

Better abstraction




> Backwards compatibility for serializers in backend state
> 
>
> Key: FLINK-5051
> URL: https://issues.apache.org/jira/browse/FLINK-5051
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>
> When a new state is register, e.g. in a keyed backend via 
> `getPartitionedState`, the caller has to provide all type serializers 
> required for the persistence of state components. Explicitly passing the 
> serializers on state creation already allows for potentiall version upgrades 
> of serializers.
> However, those serializers are currently not part of any snapshot and are 
> only provided at runtime, when the state is registered newly or restored. For 
> backwards compatibility, this has strong implications: checkpoints are not 
> self contained in that state is currently a blackbox without knowledge about 
> it's corresponding serializers. Most cases where we would need to restructure 
> the state are basically lost. We could only convert them lazily at runtime 
> and only once the user is registering the concrete state, which might happen 
> at unpredictable points.
> I suggest to adapt our solution as follows:
> - As now, all states are registered with their set of serializers.
> - Unlike now, all serializers are written to the snapshot. This makes 
> savepoints self-contained and also allows to create inspection tools for 
> savepoints at some point in the

[GitHub] flink pull request #2863: [FLINK-5051] Backwards compatibility for serialize...

2016-11-24 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/2863

[FLINK-5051] Backwards compatibility for serializers in backend state

This PR sits on top of PR #2781 and introduces backwards compatibility for 
state serializers in keyed backends. We do so by providing version 
compatibility checking for ``TypeSerializer`` and making the serializers 
mandatory part of a keyed backend's meta data in checkpoints (so that we have 
everything required to reconstruct states in a self contained way). A 
serialization proxy is introduced for keyed backend state. Currently this 
serialization proxy covers the meta data, not yet the actual data. As the PR 
essentially moves functionality to a different place, it is already covered by 
existing tests.

Notice: we should introduce a similar approach for 
``OperatorStateBackend``s.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink 
serializer-backwards-compatibility

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2863.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2863


commit a373585c2fe71b467f49f0e295dc647b43ab7a9c
Author: Stefan Richter 
Date:   2016-11-01T11:29:01Z

Backwards compatibility 1.1 -> 1.2

commit 8e4e4bcede50e66a95928ec854e51d45a7df28bf
Author: Stefan Richter 
Date:   2016-11-09T13:54:35Z

Removing some unecessary code from migration classes

commit 78bd66fade7f836eafbab978329caf1ea26f2ffc
Author: Stefan Richter 
Date:   2016-11-09T17:21:13Z

MultiStreamStateHandle

commit a9355679c3476dd890b54312e1696b61c7839873
Author: Stefan Richter 
Date:   2016-11-10T13:18:55Z

Added migration unit test

commit d079bd4bdb762c307a3c5cd084590804b90996b1
Author: Stefan Richter 
Date:   2016-11-10T13:45:58Z

rebase fixes

commit 9f47bac9c25fc33993c3942a57462039cc578dcd
Author: Stefan Richter 
Date:   2016-11-11T13:46:39Z

Minor cleanups: deleting more unnecessary classes

commit 2bbe66386d28c7914c62e2c3829ff3ab6840164c
Author: Stefan Richter 
Date:   2016-11-23T13:15:33Z

Versioned serialization

commit 6460e27717ab208aada988ba2c83d5628b31b310
Author: Stefan Richter 
Date:   2016-11-23T17:59:45Z

Common meta info introduced to keyed backends

commit e7d66377730339523bad8e3e6e75865ea5a29a6b
Author: Stefan Richter 
Date:   2016-11-23T21:40:26Z

Introducing isCompatibleWith to TypeSerializers

commit 89e3779d231fd0dadb01782791c92ec8ebb15a81
Author: Stefan Richter 
Date:   2016-11-23T22:33:42Z

Splitting / Introducing interface for versiond and compatibile

commit 6714a7efd3d839befda7a9b744311494e4ecb714
Author: Stefan Richter 
Date:   2016-11-24T10:59:01Z

Cleanup and documentation

commit 6df300f7f5a7d7b38b00ecd6636ecd53bc15d370
Author: Stefan Richter 
Date:   2016-11-24T11:18:43Z

Cleanup and documentation

commit b22273455d8f9282af715e502244811543e3fb99
Author: Stefan Richter 
Date:   2016-11-24T16:19:51Z

Better abstraction




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693660#comment-15693660
 ] 

ASF GitHub Bot commented on FLINK-4541:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2811
  
It's true, the requirements of this issue have evolved quite a bit and we 
should adapt the JIRA issue for that ;-)
+1 for creating an issue to support arbitrary inner joins with a single row 
input and addressing that issue with this PR.

Once that issue is resolved, we can revisit FLINK-4541 and see what's left 
to do.

@AlexanderShoshin, can you open the JIRA issue and adapt this PR to it?


> Support for SQL NOT IN operator
> ---
>
> Key: FLINK-4541
> URL: https://issues.apache.org/jira/browse/FLINK-4541
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Alexander Shoshin
>
> This should work:
> {code}
> def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> // register the DataSet as table "WordCount"
> tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
> tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, 
> 'frequency).select('word).filter('word !== "hello"))
> // run a SQL query on the Table and retrieve the result as a new Table
> val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE 
> word NOT IN (SELECT word FROM WordCount2) GROUP BY word")
> table.toDataSet[WC].print()
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2811
  
It's true, the requirements of this issue have evolved quite a bit and we 
should adapt the JIRA issue for that ;-)
+1 for creating an issue to support arbitrary inner joins with a single row 
input and addressing that issue with this PR.

Once that issue is resolved, we can revisit FLINK-4541 and see what's left 
to do.

@AlexanderShoshin, can you open the JIRA issue and adapt this PR to it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2664: [FLINK-4861] [build] Package optional project artifacts

2016-11-24 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2664
  
I had the same thought. We could add the maven assembly plugin / shade 
plugin to each connector / library to build a fat jar, and then add some logic 
to flink-dist to collect these fat jars into the final dist.
I'm not sure how easy it is to pull build outputs from other modules into 
the dist module, but we are doing that for the examples as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4861) Package optional project artifacts

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693654#comment-15693654
 ] 

ASF GitHub Bot commented on FLINK-4861:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2664
  
I had the same thought. We could add the maven assembly plugin / shade 
plugin to each connector / library to build a fat jar, and then add some logic 
to flink-dist to collect these fat jars into the final dist.
I'm not sure how easy it is to pull build outputs from other modules into 
the dist module, but we are doing that for the examples as well.


> Package optional project artifacts
> --
>
> Key: FLINK-4861
> URL: https://issues.apache.org/jira/browse/FLINK-4861
> Project: Flink
>  Issue Type: New Feature
>  Components: Build System
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> Per the mailing list 
> [discussion|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Additional-project-downloads-td13223.html],
>  package the Flink libraries and connectors into subdirectories of a new 
> {{opt}} directory in the release/snapshot tarballs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5158) Handle ZooKeeperCompletedCheckpointStore exceptions in CheckpointCoordinator

2016-11-24 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-5158:


Assignee: Till Rohrmann

> Handle ZooKeeperCompletedCheckpointStore exceptions in CheckpointCoordinator
> 
>
> Key: FLINK-5158
> URL: https://issues.apache.org/jira/browse/FLINK-5158
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> The checkpoint coordinator does not properly handle exceptions when trying to 
> store completed checkpoints. As a result, completed checkpoints are not 
> properly cleaned up and even worse, the {{CheckpointCoordinator}} might get 
> stuck stopping triggering checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5158) Handle ZooKeeperCompletedCheckpointStore exceptions in CheckpointCoordinator

2016-11-24 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-5158:


 Summary: Handle ZooKeeperCompletedCheckpointStore exceptions in 
CheckpointCoordinator
 Key: FLINK-5158
 URL: https://issues.apache.org/jira/browse/FLINK-5158
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.1.3, 1.2.0
Reporter: Till Rohrmann
 Fix For: 1.2.0, 1.1.4


The checkpoint coordinator does not properly handle exceptions when trying to 
store completed checkpoints. As a result, completed checkpoints are not 
properly cleaned up and even worse, the {{CheckpointCoordinator}} might get 
stuck stopping triggering checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread AlexanderShoshin
Github user AlexanderShoshin commented on the issue:

https://github.com/apache/flink/pull/2811
  
I think, that calling a JoinFunction inside a RichMapRunner make sence. I 
would also prefer not to touch the code generation if it's possible.
But shouldn't we separate the support of all inner joins with a single row 
input from this "NOT IN" pull request? We might create a new jira issue to do 
this in another pull request.
What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693640#comment-15693640
 ] 

ASF GitHub Bot commented on FLINK-4541:
---

Github user AlexanderShoshin commented on the issue:

https://github.com/apache/flink/pull/2811
  
I think, that calling a JoinFunction inside a RichMapRunner make sence. I 
would also prefer not to touch the code generation if it's possible.
But shouldn't we separate the support of all inner joins with a single row 
input from this "NOT IN" pull request? We might create a new jira issue to do 
this in another pull request.
What do you think?


> Support for SQL NOT IN operator
> ---
>
> Key: FLINK-4541
> URL: https://issues.apache.org/jira/browse/FLINK-4541
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Alexander Shoshin
>
> This should work:
> {code}
> def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> // register the DataSet as table "WordCount"
> tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
> tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, 
> 'frequency).select('word).filter('word !== "hello"))
> // run a SQL query on the Table and retrieve the result as a new Table
> val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE 
> word NOT IN (SELECT word FROM WordCount2) GROUP BY word")
> table.toDataSet[WC].print()
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-11-24 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693565#comment-15693565
 ] 

Ivan Mushketyk commented on FLINK-2254:
---

Hi [~vkalavri]
Thank you for your advice I've implemented simple metrics (will push them 
today) but as many other bipartite graph PR it is blocked by this PR: 
https://github.com/apache/flink/pull/2564

Regarding clustering coefficient. Do you want me to add another JIRA task for 
this? Should it address one type of clustering coefficient or should we have 
multiple types implemented?

> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-11-24 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693565#comment-15693565
 ] 

Ivan Mushketyk edited comment on FLINK-2254 at 11/24/16 3:40 PM:
-

Hi [~vkalavri]
Thank you for your advice I've implemented simple metrics (will push them 
today) but as many other bipartite graph PRs it is blocked by this PR: 
https://github.com/apache/flink/pull/2564

Regarding clustering coefficient. Do you want me to add another JIRA task for 
this? Should it address one type of clustering coefficient or should we have 
multiple types implemented?


was (Author: ivan.mushketyk):
Hi [~vkalavri]
Thank you for your advice I've implemented simple metrics (will push them 
today) but as many other bipartite graph PR it is blocked by this PR: 
https://github.com/apache/flink/pull/2564

Regarding clustering coefficient. Do you want me to add another JIRA task for 
this? Should it address one type of clustering coefficient or should we have 
multiple types implemented?

> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4741) WebRuntimeMonitor does not shut down all of it's threads (EventLoopGroups) on exit.

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693546#comment-15693546
 ] 

ASF GitHub Bot commented on FLINK-4741:
---

GitHub user MayerRoman opened a pull request:

https://github.com/apache/flink/pull/2862

[FLINK-4741] Fix for the proper shutdown the ServerBootstrap in the process 
of stopping the WebRuntumeMonitor

[FLINK-4741] WebRuntimeMonitor does not shut down all of it's threads 
(EventLoopGroups) on exit.

bootstrap.childGroup().shutdownGracefully() method has been added to the 
correct shutdown WebRuntimeMonitor.

More detailed explanation:

bootrstrap - it is io.netty.bootstrap.ServerBootstrap - a helper class that 
sets up a netty-server.

In its work netty-server uses two EventLoopGroups: 
The first one, often called 'boss', accepts an incoming connection. 
The second one, often called 'worker', handles the traffic of the accepted 
connection once the boss accepts the connection and registers the accepted 
connection to the worker.

At the end of netty-server work should be shut down all the threads in each 
of the two EventLoopGroups.

First EventLoopGroups stops when called 
bootstrap.group().shutdownGracefully().
To stop the second EventLoopGroups this request adds call of 
bootstrap.chiledGroup().shutdownGracefully() method.


This proposal corresponds to netty user guide:
http://netty.io/wiki/user-guide-for-4.x.html

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MayerRoman/flink FLINK-4741

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2862.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2862


commit 0d43c9a92e806e2c663d61b8db5098622474bf0e
Author: Roman Maier 
Date:   2016-11-24T12:49:00Z

[FLINK-4741] Fix for the proper shutdown the ServerBootstrap in the process 
of stopping the WebRuntumeMonitor




> WebRuntimeMonitor does not shut down all of it's threads (EventLoopGroups) on 
> exit.
> ---
>
> Key: FLINK-4741
> URL: https://issues.apache.org/jira/browse/FLINK-4741
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.1.2
>Reporter: Joseph Sims
>Assignee: Roman Maier
>Priority: Minor
>
> WebRuntimeMonitor does not shutdown correctly, causing the overall 
> application to hang on shutdown. It shuts down bootstrap.group 
> (EventLoopGroup) but not the bootstrap.childGroup (EventLoopGroup).
> If WebRuntimeMonitor is not used (local.start-webserver=false), this problem 
> does not occur.
> Class: WebRuntimeMonitor
> method: stop()
> Line: ~387
> Called:
> bootstrap.group().shutdownGracefully()
> Not called:
> bootstrap.childGroup().shutdownGracefully()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2862: [FLINK-4741] Fix for the proper shutdown the Serve...

2016-11-24 Thread MayerRoman
GitHub user MayerRoman opened a pull request:

https://github.com/apache/flink/pull/2862

[FLINK-4741] Fix for the proper shutdown the ServerBootstrap in the process 
of stopping the WebRuntumeMonitor

[FLINK-4741] WebRuntimeMonitor does not shut down all of it's threads 
(EventLoopGroups) on exit.

bootstrap.childGroup().shutdownGracefully() method has been added to the 
correct shutdown WebRuntimeMonitor.

More detailed explanation:

bootrstrap - it is io.netty.bootstrap.ServerBootstrap - a helper class that 
sets up a netty-server.

In its work netty-server uses two EventLoopGroups: 
The first one, often called 'boss', accepts an incoming connection. 
The second one, often called 'worker', handles the traffic of the accepted 
connection once the boss accepts the connection and registers the accepted 
connection to the worker.

At the end of netty-server work should be shut down all the threads in each 
of the two EventLoopGroups.

First EventLoopGroups stops when called 
bootstrap.group().shutdownGracefully().
To stop the second EventLoopGroups this request adds call of 
bootstrap.chiledGroup().shutdownGracefully() method.


This proposal corresponds to netty user guide:
http://netty.io/wiki/user-guide-for-4.x.html

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MayerRoman/flink FLINK-4741

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2862.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2862


commit 0d43c9a92e806e2c663d61b8db5098622474bf0e
Author: Roman Maier 
Date:   2016-11-24T12:49:00Z

[FLINK-4741] Fix for the proper shutdown the ServerBootstrap in the process 
of stopping the WebRuntumeMonitor




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693506#comment-15693506
 ] 

ASF GitHub Bot commented on FLINK-2254:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2564
  
@vasia I don't think anybody is shepherding this PR :)


> Add Bipartite Graph Support for Gelly
> -
>
> Key: FLINK-2254
> URL: https://issues.apache.org/jira/browse/FLINK-2254
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 0.10.0
>Reporter: Andra Lungu
>Assignee: Ivan Mushketyk
>  Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided 
> into two disjoint sets such that each edge having a source vertex in the 
> first set, will have a target vertex in the second set. We would like to 
> support efficient operations for this type of graphs along with a set of 
> metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class

2016-11-24 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2564
  
@vasia I don't think anybody is shepherding this PR :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2664: [FLINK-4861] [build] Package optional project artifacts

2016-11-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2664
  
How about just building a fat jar for each connector / library?
That way it becomes quite easy for users - they simply refer to one jar.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4861) Package optional project artifacts

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693446#comment-15693446
 ] 

ASF GitHub Bot commented on FLINK-4861:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2664
  
How about just building a fat jar for each connector / library?
That way it becomes quite easy for users - they simply refer to one jar.


> Package optional project artifacts
> --
>
> Key: FLINK-4861
> URL: https://issues.apache.org/jira/browse/FLINK-4861
> Project: Flink
>  Issue Type: New Feature
>  Components: Build System
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> Per the mailing list 
> [discussion|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Additional-project-downloads-td13223.html],
>  package the Flink libraries and connectors into subdirectories of a new 
> {{opt}} directory in the release/snapshot tarballs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5157) Extending AllWindow Function Metadata

2016-11-24 Thread Ventura Del Monte (JIRA)
Ventura Del Monte created FLINK-5157:


 Summary: Extending AllWindow Function Metadata
 Key: FLINK-5157
 URL: https://issues.apache.org/jira/browse/FLINK-5157
 Project: Flink
  Issue Type: New Feature
  Components: DataStream API, Streaming, Windowing Operators
Reporter: Ventura Del Monte
Assignee: Ventura Del Monte
 Fix For: 1.2.0


Following the logic behind [1,2], ProcessAllWindowFunction can be introduced in 
Flink and AllWindowedStream can be extended in order to support them. 

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata

[2] https://issues.apache.org/jira/browse/FLINK-4997



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2856: Removed excessive tests.

2016-11-24 Thread chermenin
Github user chermenin commented on the issue:

https://github.com/apache/flink/pull/2856
  
Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4712) Implementing ranking predictions for ALS

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693381#comment-15693381
 ] 

ASF GitHub Bot commented on FLINK-4712:
---

Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2838#discussion_r89503899
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
 ---
@@ -72,14 +77,142 @@ trait Predictor[Self] extends Estimator[Self] with 
WithParameters {
 */
   def evaluate[Testing, PredictionValue](
   testing: DataSet[Testing],
-  evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
-  evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue])
+  evaluateParameters: ParameterMap = ParameterMap.Empty)
+  (implicit evaluator: EvaluateDataSetOperation[Self, Testing, 
PredictionValue])
 : DataSet[(PredictionValue, PredictionValue)] = {
 FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment)
 evaluator.evaluateDataSet(this, evaluateParameters, testing)
   }
 }
 
+trait RankingPredictor[Self] extends Estimator[Self] with WithParameters {
+  that: Self =>
+
+  def predictRankings(
+k: Int,
+users: DataSet[Int],
+predictParameters: ParameterMap = ParameterMap.Empty)(implicit
+rankingPredictOperation : RankingPredictOperation[Self])
+  : DataSet[(Int,Int,Int)] =
+rankingPredictOperation.predictRankings(this, k, users, 
predictParameters)
+
+  def evaluateRankings(
+testing: DataSet[(Int,Int,Double)],
+evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
+rankingPredictOperation : RankingPredictOperation[Self])
+  : DataSet[(Int,Int,Int)] = {
+// todo: do not burn 100 topK into code
+predictRankings(100, testing.map(_._1).distinct(), evaluateParameters)
+  }
+}
+
+trait RankingPredictOperation[Instance] {
+  def predictRankings(
+instance: Instance,
+k: Int,
+users: DataSet[Int],
+predictParameters: ParameterMap = ParameterMap.Empty)
+  : DataSet[(Int, Int, Int)]
+}
+
+/**
+  * Trait for providing auxiliary data for ranking evaluations.
+  *
+  * They are useful e.g. for excluding items found in the training 
[[DataSet]]
+  * from the recommended top K items.
+  */
+trait TrainingRatingsProvider {
+
+  def getTrainingData: DataSet[(Int, Int, Double)]
+
+  /**
+* Retrieving the training items.
+* Although this can be calculated from the training data, it requires 
a costly
+* [[DataSet.distinct]] operation, while in matrix factor models the 
set items could be
+* given more efficiently from the item factors.
+*/
+  def getTrainingItems: DataSet[Int] = {
+getTrainingData.map(_._2).distinct()
+  }
+}
+
+/**
+  * Ranking predictions for the most common case.
+  * If we can predict ratings, we can compute top K lists by sorting the 
predicted ratings.
+  */
+class RankingFromRatingPredictOperation[Instance <: 
TrainingRatingsProvider]
+(val ratingPredictor: PredictDataSetOperation[Instance, (Int, Int), (Int, 
Int, Double)])
+  extends RankingPredictOperation[Instance] {
+
+  private def getUserItemPairs(users: DataSet[Int], items: DataSet[Int], 
exclude: DataSet[(Int, Int)])
+  : DataSet[(Int, Int)] = {
+users.cross(items)
--- End diff --

I completely agree. There are use-cases where we would not like to give 
rankings from all the items. E.g. when recommending TV programs, we would only 
like to recommend currently running TV programs, but train on all of them.

We'll include an `item` DataSet parameter to ranking predictions.

(Btw. I believe the "Flink-way" is to let the user configure as much as 
possible, but that's just my opinion :) )


> Implementing ranking predictions for ALS
> 
>
> Key: FLINK-4712
> URL: https://issues.apache.org/jira/browse/FLINK-4712
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Domokos Miklós Kelen
>Assignee: Gábor Hermann
>
> We started working on implementing ranking predictions for recommender 
> systems. Ranking prediction means that beside predicting scores for user-item 
> pairs, the recommender system is able to recommend a top K list for the users.
> Details:
> In practice, this would mean finding the K items for a particular user with 
> the highest predicted rating. It should be possible also to specify whether 
> to exclude the already seen items from a particular user's toplist. (See for 
> ex

[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...

2016-11-24 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2838#discussion_r89503899
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
 ---
@@ -72,14 +77,142 @@ trait Predictor[Self] extends Estimator[Self] with 
WithParameters {
 */
   def evaluate[Testing, PredictionValue](
   testing: DataSet[Testing],
-  evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
-  evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue])
+  evaluateParameters: ParameterMap = ParameterMap.Empty)
+  (implicit evaluator: EvaluateDataSetOperation[Self, Testing, 
PredictionValue])
 : DataSet[(PredictionValue, PredictionValue)] = {
 FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment)
 evaluator.evaluateDataSet(this, evaluateParameters, testing)
   }
 }
 
+trait RankingPredictor[Self] extends Estimator[Self] with WithParameters {
+  that: Self =>
+
+  def predictRankings(
+k: Int,
+users: DataSet[Int],
+predictParameters: ParameterMap = ParameterMap.Empty)(implicit
+rankingPredictOperation : RankingPredictOperation[Self])
+  : DataSet[(Int,Int,Int)] =
+rankingPredictOperation.predictRankings(this, k, users, 
predictParameters)
+
+  def evaluateRankings(
+testing: DataSet[(Int,Int,Double)],
+evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
+rankingPredictOperation : RankingPredictOperation[Self])
+  : DataSet[(Int,Int,Int)] = {
+// todo: do not burn 100 topK into code
+predictRankings(100, testing.map(_._1).distinct(), evaluateParameters)
+  }
+}
+
+trait RankingPredictOperation[Instance] {
+  def predictRankings(
+instance: Instance,
+k: Int,
+users: DataSet[Int],
+predictParameters: ParameterMap = ParameterMap.Empty)
+  : DataSet[(Int, Int, Int)]
+}
+
+/**
+  * Trait for providing auxiliary data for ranking evaluations.
+  *
+  * They are useful e.g. for excluding items found in the training 
[[DataSet]]
+  * from the recommended top K items.
+  */
+trait TrainingRatingsProvider {
+
+  def getTrainingData: DataSet[(Int, Int, Double)]
+
+  /**
+* Retrieving the training items.
+* Although this can be calculated from the training data, it requires 
a costly
+* [[DataSet.distinct]] operation, while in matrix factor models the 
set items could be
+* given more efficiently from the item factors.
+*/
+  def getTrainingItems: DataSet[Int] = {
+getTrainingData.map(_._2).distinct()
+  }
+}
+
+/**
+  * Ranking predictions for the most common case.
+  * If we can predict ratings, we can compute top K lists by sorting the 
predicted ratings.
+  */
+class RankingFromRatingPredictOperation[Instance <: 
TrainingRatingsProvider]
+(val ratingPredictor: PredictDataSetOperation[Instance, (Int, Int), (Int, 
Int, Double)])
+  extends RankingPredictOperation[Instance] {
+
+  private def getUserItemPairs(users: DataSet[Int], items: DataSet[Int], 
exclude: DataSet[(Int, Int)])
+  : DataSet[(Int, Int)] = {
+users.cross(items)
--- End diff --

I completely agree. There are use-cases where we would not like to give 
rankings from all the items. E.g. when recommending TV programs, we would only 
like to recommend currently running TV programs, but train on all of them.

We'll include an `item` DataSet parameter to ranking predictions.

(Btw. I believe the "Flink-way" is to let the user configure as much as 
possible, but that's just my opinion :) )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4450) update storm version to 1.0.0

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693361#comment-15693361
 ] 

ASF GitHub Bot commented on FLINK-4450:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2439
  
Thanks!

You can squash commits by rebasing. Have a look here: 
http://gitready.com/advanced/2009/02/10/squashing-commits-with-rebase.html


> update storm version to 1.0.0
> -
>
> Key: FLINK-4450
> URL: https://issues.apache.org/jira/browse/FLINK-4450
> Project: Flink
>  Issue Type: Improvement
>  Components: flink-contrib
>Reporter: yuzhongliu
> Fix For: 2.0.0
>
>
> The storm package path was changed in new version
> storm old version package:
> backtype.storm.*
> storm new version pachage:
> org.apache.storm.*
> shall we update flink/flink-storm code to new storm version?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5055) Security feature crashes JM for certain Hadoop versions even though using no Kerberos

2016-11-24 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693362#comment-15693362
 ] 

Maximilian Michels commented on FLINK-5055:
---

That seems like a logical explanation. However, Till and me were able to 
reproduce the problem with a standard Hadoop config (security turned off). I'm 
looking into the cause of the problem now.

> Security feature crashes JM for certain Hadoop versions even though using no 
> Kerberos
> -
>
> Key: FLINK-5055
> URL: https://issues.apache.org/jira/browse/FLINK-5055
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Maximilian Michels
>Priority: Critical
> Fix For: 1.2.0
>
>
> A user reported [1] that the {{JobManager}} does not start when using Flink 
> with Hadoop-2.7.0-mapr-1607 and no security activated because of 
> {code}
> javax.security.auth.login.LoginException: Unable to obtain Principal Name for 
> authentication
> at 
> com.sun.security.auth.module.Krb5LoginModule.promptForName(Krb5LoginModule.java:841)
> at 
> com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:704)
> at 
> com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
> {code}
> It seems that this Hadoop version always tries to login via Kerberos even 
> though the user did not activate it and, thus, should use 
> {{AuthenticationMode.SIMPLE}}.
> I'm not really familiar with the security feature, but my understanding is 
> that it should not have any effect on Flink when not activated. I might be 
> wrong here, but if not, then we should fix this problem for 1.2.0 because it 
> prevents people from using Flink.
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-using-Yarn-on-MapR-td14484.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4712) Implementing ranking predictions for ALS

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693360#comment-15693360
 ] 

ASF GitHub Bot commented on FLINK-4712:
---

Github user gaborhermann commented on the issue:

https://github.com/apache/flink/pull/2838
  
Thanks again for taking a look at our PR!

I've just realized from a developer mailing list thread that the FlinkML 
API is still not carved into stone even until 2.0, and it's nice to hear that :)

The problem is not with the `evaluate(test: TestType): DataSet[Double]` but 
rather with `evaluate(test: TestType): DataSet[(Prediction,Prediction)]`. It's 
at least confusing to have both, but it might not be worth to expose the one 
giving `(Prediction,Prediction)` pairs to the user as it only *prepares* 
evaluation. With introducing the evaluation framework, we could at least rename 
it to something like `preparePairwiseEvaluation(test: TestType): 
DataSet[(Prediction,Prediction)]`. In the ranking case we might generalize it 
to `prepareEvaluation(test: TestType): PreparedTesting`. We basically did this 
with the `PrepareDataSetOperation`, we've just left the old `evaluate` as it is 
for now. I suggest to change this if we can break the API.

I'll do a rebase on the cross-validation PR. At first glance, it should not 
really be a problem to do both cross-validation and hyper-parameter tuning, as 
the user has to provide a `Scorer` anyway. A minor issue I see is the user 
falling back to a default `score` (e.g. RMSE in case of ALS). This might not be 
a problem for recommendation models that give rating predictions beside ranking 
predictions, but it's a problem for models that *only* give ranking 
predictions, because those do not extend the `Predictor` class. This is not an 
issue for now, but might be a problem when adding more recommendation models. 
Should we try and do this now or is it a bit "overengineering"? I'll see if any 
other problem comes up with after rebasing.

The `RankingPredictor` interface is useful *internally* for the `Score`s. 
It serves a contract between a `RankingScore` and the model. I'm sure it will 
be used only for recommendations, but it's no effort exposing it, so the user 
can write code using a general `RankingPredictor` (although I would not think 
this is what users would like to do :) ). A better question is whether to use 
it in a `Pipeline`. We discussed this with some people, and could not really 
find a use-case where we need a `Transformer`-like preprocessing for 
recommendations. Of course, there could be other preprocessing steps, such as 
removing/aggregating duplicates, but those do not have to be `fit` to training 
data. Based on this, it's not worth the effort to integrate `RankingPredictor` 
with the `Pipeline`, at least for now.


> Implementing ranking predictions for ALS
> 
>
> Key: FLINK-4712
> URL: https://issues.apache.org/jira/browse/FLINK-4712
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Domokos Miklós Kelen
>Assignee: Gábor Hermann
>
> We started working on implementing ranking predictions for recommender 
> systems. Ranking prediction means that beside predicting scores for user-item 
> pairs, the recommender system is able to recommend a top K list for the users.
> Details:
> In practice, this would mean finding the K items for a particular user with 
> the highest predicted rating. It should be possible also to specify whether 
> to exclude the already seen items from a particular user's toplist. (See for 
> example the 'exclude_known' setting of [Graphlab Create's ranking 
> factorization 
> recommender|https://turi.com/products/create/docs/generated/graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend.html#graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend]
>  ).
> The output of the topK recommendation function could be in the form of 
> {{DataSet[(Int,Int,Int)]}}, meaning (user, item, rank), similar to Graphlab 
> Create's output. However, this is arguable: follow up work includes 
> implementing ranking recommendation evaluation metrics (such as precision@k, 
> recall@k, ndcg@k), similar to [Spark's 
> implementations|https://spark.apache.org/docs/1.5.0/mllib-evaluation-metrics.html#ranking-systems].
>  It would be beneficial if we were able to design the API such that it could 
> be included in the proposed evaluation framework (see 
> [5157|https://issues.apache.org/jira/browse/FLINK-2157]), which makes it 
> neccessary to consider the possible output type {{DataSet[(Int, 
> Array[Int])]}} or {{DataSet[(Int, Array[(Int,Double)])]}} meaning (user, 
> array of items), possibly including the predicted s

[GitHub] flink issue #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendation & e...

2016-11-24 Thread gaborhermann
Github user gaborhermann commented on the issue:

https://github.com/apache/flink/pull/2838
  
Thanks again for taking a look at our PR!

I've just realized from a developer mailing list thread that the FlinkML 
API is still not carved into stone even until 2.0, and it's nice to hear that :)

The problem is not with the `evaluate(test: TestType): DataSet[Double]` but 
rather with `evaluate(test: TestType): DataSet[(Prediction,Prediction)]`. It's 
at least confusing to have both, but it might not be worth to expose the one 
giving `(Prediction,Prediction)` pairs to the user as it only *prepares* 
evaluation. With introducing the evaluation framework, we could at least rename 
it to something like `preparePairwiseEvaluation(test: TestType): 
DataSet[(Prediction,Prediction)]`. In the ranking case we might generalize it 
to `prepareEvaluation(test: TestType): PreparedTesting`. We basically did this 
with the `PrepareDataSetOperation`, we've just left the old `evaluate` as it is 
for now. I suggest to change this if we can break the API.

I'll do a rebase on the cross-validation PR. At first glance, it should not 
really be a problem to do both cross-validation and hyper-parameter tuning, as 
the user has to provide a `Scorer` anyway. A minor issue I see is the user 
falling back to a default `score` (e.g. RMSE in case of ALS). This might not be 
a problem for recommendation models that give rating predictions beside ranking 
predictions, but it's a problem for models that *only* give ranking 
predictions, because those do not extend the `Predictor` class. This is not an 
issue for now, but might be a problem when adding more recommendation models. 
Should we try and do this now or is it a bit "overengineering"? I'll see if any 
other problem comes up with after rebasing.

The `RankingPredictor` interface is useful *internally* for the `Score`s. 
It serves a contract between a `RankingScore` and the model. I'm sure it will 
be used only for recommendations, but it's no effort exposing it, so the user 
can write code using a general `RankingPredictor` (although I would not think 
this is what users would like to do :) ). A better question is whether to use 
it in a `Pipeline`. We discussed this with some people, and could not really 
find a use-case where we need a `Transformer`-like preprocessing for 
recommendations. Of course, there could be other preprocessing steps, such as 
removing/aggregating duplicates, but those do not have to be `fit` to training 
data. Based on this, it's not worth the effort to integrate `RankingPredictor` 
with the `Pipeline`, at least for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2439: [FLINK-4450]update storm verion to 1.0.0 in flink-storm a...

2016-11-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2439
  
Thanks!

You can squash commits by rebasing. Have a look here: 
http://gitready.com/advanced/2009/02/10/squashing-commits-with-rebase.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-5055) Security feature crashes JM for certain Hadoop versions even though using no Kerberos

2016-11-24 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned FLINK-5055:
-

Assignee: Maximilian Michels

> Security feature crashes JM for certain Hadoop versions even though using no 
> Kerberos
> -
>
> Key: FLINK-5055
> URL: https://issues.apache.org/jira/browse/FLINK-5055
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Maximilian Michels
>Priority: Critical
> Fix For: 1.2.0
>
>
> A user reported [1] that the {{JobManager}} does not start when using Flink 
> with Hadoop-2.7.0-mapr-1607 and no security activated because of 
> {code}
> javax.security.auth.login.LoginException: Unable to obtain Principal Name for 
> authentication
> at 
> com.sun.security.auth.module.Krb5LoginModule.promptForName(Krb5LoginModule.java:841)
> at 
> com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:704)
> at 
> com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
> {code}
> It seems that this Hadoop version always tries to login via Kerberos even 
> though the user did not activate it and, thus, should use 
> {{AuthenticationMode.SIMPLE}}.
> I'm not really familiar with the security feature, but my understanding is 
> that it should not have any effect on Flink when not activated. I might be 
> wrong here, but if not, then we should fix this problem for 1.2.0 because it 
> prevents people from using Flink.
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-using-Yarn-on-MapR-td14484.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4450) update storm version to 1.0.0

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693353#comment-15693353
 ] 

ASF GitHub Bot commented on FLINK-4450:
---

Github user liuyuzhong commented on the issue:

https://github.com/apache/flink/pull/2439
  
Complie error is fixed. But how to squash all commits into one commit?


> update storm version to 1.0.0
> -
>
> Key: FLINK-4450
> URL: https://issues.apache.org/jira/browse/FLINK-4450
> Project: Flink
>  Issue Type: Improvement
>  Components: flink-contrib
>Reporter: yuzhongliu
> Fix For: 2.0.0
>
>
> The storm package path was changed in new version
> storm old version package:
> backtype.storm.*
> storm new version pachage:
> org.apache.storm.*
> shall we update flink/flink-storm code to new storm version?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2439: [FLINK-4450]update storm verion to 1.0.0 in flink-storm a...

2016-11-24 Thread liuyuzhong
Github user liuyuzhong commented on the issue:

https://github.com/apache/flink/pull/2439
  
Complie error is fixed. But how to squash all commits into one commit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5156) Consolidate streaming FieldAccessor functionality

2016-11-24 Thread JIRA
Márton Balassi created FLINK-5156:
-

 Summary: Consolidate streaming FieldAccessor functionality
 Key: FLINK-5156
 URL: https://issues.apache.org/jira/browse/FLINK-5156
 Project: Flink
  Issue Type: Task
Reporter: Márton Balassi


The streaming FieldAccessors (keyedStream.keyBy(...)) have slightly different 
semantics compared to their batch counterparts. 

Currently the streaming ones allow selecting a field within an array (which 
might be dangerous as the array typeinfo does not contain the length of the 
array, thus leading to a potential index out of bounds) and accept not only 
"*", but also "0" to select a whole type.

This functionality should be either removed or documented. The latter can be 
achieved by effectively reverting [1]. Note that said commit was squashed 
before merging.

[1] 
https://github.com/mbalassi/flink/commit/237f07eb113508703c980b14587d66970e7f6251



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4997) Extending Window Function Metadata

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693232#comment-15693232
 ] 

ASF GitHub Bot commented on FLINK-4997:
---

Github user VenturaDelMonte commented on the issue:

https://github.com/apache/flink/pull/2756
  
@manuzhang thank you for your feedback and no problem!

@aljoscha 👍


> Extending Window Function Metadata
> --
>
> Key: FLINK-4997
> URL: https://issues.apache.org/jira/browse/FLINK-4997
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, Streaming, Windowing Operators
>Reporter: Ventura Del Monte
>Assignee: Ventura Del Monte
> Fix For: 1.2.0
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2756: [FLINK-4997] Extending Window Function Metadata

2016-11-24 Thread VenturaDelMonte
Github user VenturaDelMonte commented on the issue:

https://github.com/apache/flink/pull/2756
  
@manuzhang thank you for your feedback and no problem!

@aljoscha 👍


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693171#comment-15693171
 ] 

ASF GitHub Bot commented on FLINK-3702:
---

Github user mbalassi commented on the issue:

https://github.com/apache/flink/pull/2094
  
Merging...


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.0.0
>Reporter: Robert Metzger
>Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar 
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2094: [FLINK-3702] Make FieldAccessors support nested field exp...

2016-11-24 Thread mbalassi
Github user mbalassi commented on the issue:

https://github.com/apache/flink/pull/2094
  
Merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4712) Implementing ranking predictions for ALS

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693161#comment-15693161
 ] 

ASF GitHub Bot commented on FLINK-4712:
---

Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2838#discussion_r89489117
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
 ---
@@ -72,14 +77,142 @@ trait Predictor[Self] extends Estimator[Self] with 
WithParameters {
 */
   def evaluate[Testing, PredictionValue](
   testing: DataSet[Testing],
-  evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
-  evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue])
+  evaluateParameters: ParameterMap = ParameterMap.Empty)
+  (implicit evaluator: EvaluateDataSetOperation[Self, Testing, 
PredictionValue])
 : DataSet[(PredictionValue, PredictionValue)] = {
 FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment)
 evaluator.evaluateDataSet(this, evaluateParameters, testing)
   }
 }
 
+trait RankingPredictor[Self] extends Estimator[Self] with WithParameters {
+  that: Self =>
+
+  def predictRankings(
+k: Int,
+users: DataSet[Int],
+predictParameters: ParameterMap = ParameterMap.Empty)(implicit
+rankingPredictOperation : RankingPredictOperation[Self])
+  : DataSet[(Int,Int,Int)] =
+rankingPredictOperation.predictRankings(this, k, users, 
predictParameters)
+
+  def evaluateRankings(
+testing: DataSet[(Int,Int,Double)],
+evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
+rankingPredictOperation : RankingPredictOperation[Self])
+  : DataSet[(Int,Int,Int)] = {
+// todo: do not burn 100 topK into code
+predictRankings(100, testing.map(_._1).distinct(), evaluateParameters)
+  }
+}
+
+trait RankingPredictOperation[Instance] {
+  def predictRankings(
+instance: Instance,
+k: Int,
+users: DataSet[Int],
+predictParameters: ParameterMap = ParameterMap.Empty)
+  : DataSet[(Int, Int, Int)]
+}
+
+/**
+  * Trait for providing auxiliary data for ranking evaluations.
+  *
+  * They are useful e.g. for excluding items found in the training 
[[DataSet]]
+  * from the recommended top K items.
+  */
+trait TrainingRatingsProvider {
+
+  def getTrainingData: DataSet[(Int, Int, Double)]
+
+  /**
+* Retrieving the training items.
+* Although this can be calculated from the training data, it requires 
a costly
+* [[DataSet.distinct]] operation, while in matrix factor models the 
set items could be
+* given more efficiently from the item factors.
+*/
+  def getTrainingItems: DataSet[Int] = {
+getTrainingData.map(_._2).distinct()
+  }
+}
+
+/**
+  * Ranking predictions for the most common case.
+  * If we can predict ratings, we can compute top K lists by sorting the 
predicted ratings.
+  */
+class RankingFromRatingPredictOperation[Instance <: 
TrainingRatingsProvider]
+(val ratingPredictor: PredictDataSetOperation[Instance, (Int, Int), (Int, 
Int, Double)])
+  extends RankingPredictOperation[Instance] {
+
+  private def getUserItemPairs(users: DataSet[Int], items: DataSet[Int], 
exclude: DataSet[(Int, Int)])
+  : DataSet[(Int, Int)] = {
+users.cross(items)
--- End diff --

You're right. Although there's not much we can do generally to avoid this, 
we might be able to optimize for matrix factorization. This solution works for 
*every* predictor that predicts ratings, and we currently use it in ALS 
([here](https://github.com/apache/flink/pull/2838/files/45c98a97ef82d1012062dbcf6ade85a8d566062d#diff-80639a21b8fd166b5f7df5280cd609a9R467)).
 With a matrix factorization model *specifically*, we can avoid materializing 
all user-item pairs as tuples, and compute the rankings more directly, and that 
might be more efficient. So we could use a more specific `RankingPredictor` 
implementation in `ALS`. But even in that case, we still need to go through all 
the items for a particular user to calculate the top k items for that user.

Also this is only calculated with for the users we'd like to give rankings 
to. E.g. in a testing scenario, for the users in the test data which might be 
significantly less than the users in the training data.

I suggest to keep this anyway as this is general. We might come up with a 
solution that's slightly efficient in most cases for MF models. Should put 
effort in working on it? What do you think?


> Implementing ranking predictions for ALS
> 
>
> Key: FLINK

[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...

2016-11-24 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2838#discussion_r89489117
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
 ---
@@ -72,14 +77,142 @@ trait Predictor[Self] extends Estimator[Self] with 
WithParameters {
 */
   def evaluate[Testing, PredictionValue](
   testing: DataSet[Testing],
-  evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
-  evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue])
+  evaluateParameters: ParameterMap = ParameterMap.Empty)
+  (implicit evaluator: EvaluateDataSetOperation[Self, Testing, 
PredictionValue])
 : DataSet[(PredictionValue, PredictionValue)] = {
 FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment)
 evaluator.evaluateDataSet(this, evaluateParameters, testing)
   }
 }
 
+trait RankingPredictor[Self] extends Estimator[Self] with WithParameters {
+  that: Self =>
+
+  def predictRankings(
+k: Int,
+users: DataSet[Int],
+predictParameters: ParameterMap = ParameterMap.Empty)(implicit
+rankingPredictOperation : RankingPredictOperation[Self])
+  : DataSet[(Int,Int,Int)] =
+rankingPredictOperation.predictRankings(this, k, users, 
predictParameters)
+
+  def evaluateRankings(
+testing: DataSet[(Int,Int,Double)],
+evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
+rankingPredictOperation : RankingPredictOperation[Self])
+  : DataSet[(Int,Int,Int)] = {
+// todo: do not burn 100 topK into code
+predictRankings(100, testing.map(_._1).distinct(), evaluateParameters)
+  }
+}
+
+trait RankingPredictOperation[Instance] {
+  def predictRankings(
+instance: Instance,
+k: Int,
+users: DataSet[Int],
+predictParameters: ParameterMap = ParameterMap.Empty)
+  : DataSet[(Int, Int, Int)]
+}
+
+/**
+  * Trait for providing auxiliary data for ranking evaluations.
+  *
+  * They are useful e.g. for excluding items found in the training 
[[DataSet]]
+  * from the recommended top K items.
+  */
+trait TrainingRatingsProvider {
+
+  def getTrainingData: DataSet[(Int, Int, Double)]
+
+  /**
+* Retrieving the training items.
+* Although this can be calculated from the training data, it requires 
a costly
+* [[DataSet.distinct]] operation, while in matrix factor models the 
set items could be
+* given more efficiently from the item factors.
+*/
+  def getTrainingItems: DataSet[Int] = {
+getTrainingData.map(_._2).distinct()
+  }
+}
+
+/**
+  * Ranking predictions for the most common case.
+  * If we can predict ratings, we can compute top K lists by sorting the 
predicted ratings.
+  */
+class RankingFromRatingPredictOperation[Instance <: 
TrainingRatingsProvider]
+(val ratingPredictor: PredictDataSetOperation[Instance, (Int, Int), (Int, 
Int, Double)])
+  extends RankingPredictOperation[Instance] {
+
+  private def getUserItemPairs(users: DataSet[Int], items: DataSet[Int], 
exclude: DataSet[(Int, Int)])
+  : DataSet[(Int, Int)] = {
+users.cross(items)
--- End diff --

You're right. Although there's not much we can do generally to avoid this, 
we might be able to optimize for matrix factorization. This solution works for 
*every* predictor that predicts ratings, and we currently use it in ALS 
([here](https://github.com/apache/flink/pull/2838/files/45c98a97ef82d1012062dbcf6ade85a8d566062d#diff-80639a21b8fd166b5f7df5280cd609a9R467)).
 With a matrix factorization model *specifically*, we can avoid materializing 
all user-item pairs as tuples, and compute the rankings more directly, and that 
might be more efficient. So we could use a more specific `RankingPredictor` 
implementation in `ALS`. But even in that case, we still need to go through all 
the items for a particular user to calculate the top k items for that user.

Also this is only calculated with for the users we'd like to give rankings 
to. E.g. in a testing scenario, for the users in the test data which might be 
significantly less than the users in the training data.

I suggest to keep this anyway as this is general. We might come up with a 
solution that's slightly efficient in most cases for MF models. Should put 
effort in working on it? What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693124#comment-15693124
 ] 

ASF GitHub Bot commented on FLINK-4541:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89482516
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.RichMapRunner
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with CrossOperator.
+  */
+class DataSetSingleRowCross(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinRowType: RelDataType,
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowCross(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinRowType,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  val rowCnt = metadata.getRowCount(child)
+  val rowSize = this.estimateRowSize(child.getRowType)
+  cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+val broadcastSetName = "joinSet"
+val mapSideJoin = generateMapFunction(
+  tableEnv.getConfig,
+  leftDataSet.getType,
+  rightDataSet.getType,
+  leftIsSingle,
+  broadcastSetName,
+  expectedType)
+
+val (multiRowDataSet, singleRowDataSet) =
+  if (leftIsSingle) {
+(rightDataSet, leftDataSet)
+  } else {
+(leftDataSet, rightDataSet)
+  }
+
+multiRowDataSet
+  .map(mapSideJoin)
+  .withBroadcastSet(singleRowDataSet, broadcastSetName)
+  .name(getMapOperatorName)

[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89461812
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSingleRowCross}
+
+class DataSetSingleRowCrossRule
+  extends ConverterRule(
+  classOf[LogicalJoin],
+  Convention.NONE,
+  DataSetConvention.INSTANCE,
+  "DataSetSingleRowCrossRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join = call.rel(0).asInstanceOf[LogicalJoin]
+
+if (isCrossJoin(join)) {
+  
isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) ||
+
isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+} else {
+  false
+}
+  }
+
+  private def isCrossJoin(join: LogicalJoin) = {
+val joinCondition = join.analyzeCondition
+joinCondition.isEqui && joinCondition.pairs().isEmpty
+  }
+
+  private def isGlobalAggregation(node: RelNode) = {
+node.isInstanceOf[LogicalAggregate] &&
+  isSingleLine(node.asInstanceOf[LogicalAggregate])
+  }
+
+  private def isSingleLine(agg: LogicalAggregate) = {
+agg.getGroupSets.size() == 1 &&
--- End diff --

I saw in a comment in Calcite that `groupSets` may be null. 
For safety, we should change the condition to `groupSets == null || 
(groupSets.size() == 1 && groupSets.get(0).isEmpty)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693131#comment-15693131
 ] 

ASF GitHub Bot commented on FLINK-4541:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89483428
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/RichMapRunner.scala
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.slf4j.LoggerFactory
+
+class RichMapRunner[IN, OUT](
--- End diff --

I think we need two map join runner classes, one for each side of the 
broadcasted value. Could be named `MapJoinLeftRunner` and `MapJoinRightRunner`.


> Support for SQL NOT IN operator
> ---
>
> Key: FLINK-4541
> URL: https://issues.apache.org/jira/browse/FLINK-4541
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Alexander Shoshin
>
> This should work:
> {code}
> def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> // register the DataSet as table "WordCount"
> tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
> tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, 
> 'frequency).select('word).filter('word !== "hello"))
> // run a SQL query on the Table and retrieve the result as a new Table
> val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE 
> word NOT IN (SELECT word FROM WordCount2) GROUP BY word")
> table.toDataSet[WC].print()
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89486698
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSingleRowCross}
+
+class DataSetSingleRowCrossRule
+  extends ConverterRule(
+  classOf[LogicalJoin],
+  Convention.NONE,
+  DataSetConvention.INSTANCE,
+  "DataSetSingleRowCrossRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join = call.rel(0).asInstanceOf[LogicalJoin]
+
+if (isCrossJoin(join)) {
--- End diff --

We need to check that the `LogicalJoin.joinType` is `INNER`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693125#comment-15693125
 ] 

ASF GitHub Bot commented on FLINK-4541:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89480767
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.RichMapRunner
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with CrossOperator.
+  */
+class DataSetSingleRowCross(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinRowType: RelDataType,
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowCross(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinRowType,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  val rowCnt = metadata.getRowCount(child)
+  val rowSize = this.estimateRowSize(child.getRowType)
+  cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+val broadcastSetName = "joinSet"
+val mapSideJoin = generateMapFunction(
+  tableEnv.getConfig,
+  leftDataSet.getType,
+  rightDataSet.getType,
+  leftIsSingle,
+  broadcastSetName,
+  expectedType)
+
+val (multiRowDataSet, singleRowDataSet) =
+  if (leftIsSingle) {
+(rightDataSet, leftDataSet)
+  } else {
+(leftDataSet, rightDataSet)
+  }
+
+multiRowDataSet
+  .map(mapSideJoin)
+  .withBroadcastSet(singleRowDataSet, broadcastSetName)
+  .name(getMapOperatorName)

[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89480684
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.RichMapRunner
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with CrossOperator.
+  */
+class DataSetSingleRowCross(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinRowType: RelDataType,
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowCross(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinRowType,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  val rowCnt = metadata.getRowCount(child)
+  val rowSize = this.estimateRowSize(child.getRowType)
+  cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+val broadcastSetName = "joinSet"
+val mapSideJoin = generateMapFunction(
+  tableEnv.getConfig,
+  leftDataSet.getType,
+  rightDataSet.getType,
+  leftIsSingle,
+  broadcastSetName,
+  expectedType)
+
+val (multiRowDataSet, singleRowDataSet) =
+  if (leftIsSingle) {
+(rightDataSet, leftDataSet)
+  } else {
+(leftDataSet, rightDataSet)
+  }
+
+multiRowDataSet
+  .map(mapSideJoin)
+  .withBroadcastSet(singleRowDataSet, broadcastSetName)
+  .name(getMapOperatorName)
+  .asInstanceOf[DataSet[Any]]
+  }
+
+  private def generateMapFunction(
--- End diff --

Can we include the join condition here as well?


---
If your project is set up for it, you can reply to this email and have your
rep

[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89480767
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.RichMapRunner
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with CrossOperator.
+  */
+class DataSetSingleRowCross(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinRowType: RelDataType,
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowCross(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinRowType,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  val rowCnt = metadata.getRowCount(child)
+  val rowSize = this.estimateRowSize(child.getRowType)
+  cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+val broadcastSetName = "joinSet"
+val mapSideJoin = generateMapFunction(
+  tableEnv.getConfig,
+  leftDataSet.getType,
+  rightDataSet.getType,
+  leftIsSingle,
+  broadcastSetName,
+  expectedType)
+
+val (multiRowDataSet, singleRowDataSet) =
+  if (leftIsSingle) {
+(rightDataSet, leftDataSet)
+  } else {
+(leftDataSet, rightDataSet)
+  }
+
+multiRowDataSet
+  .map(mapSideJoin)
+  .withBroadcastSet(singleRowDataSet, broadcastSetName)
+  .name(getMapOperatorName)
+  .asInstanceOf[DataSet[Any]]
+  }
+
+  private def generateMapFunction(
+  config: TableConfig,
+  inputType1: TypeInformation[Any],
+  inputType2: TypeInformation[Any],
+  firstIsSingle: Boolean,
+

[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693126#comment-15693126
 ] 

ASF GitHub Bot commented on FLINK-4541:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89483310
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.RichMapRunner
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with CrossOperator.
+  */
+class DataSetSingleRowCross(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
--- End diff --

Add the join condition.


> Support for SQL NOT IN operator
> ---
>
> Key: FLINK-4541
> URL: https://issues.apache.org/jira/browse/FLINK-4541
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Alexander Shoshin
>
> This should work:
> {code}
> def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> // register the DataSet as table "WordCount"
> tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
> tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, 
> 'frequency).select('word).filter('word !== "hello"))
> // run a SQL query on the Table and retrieve the result as a new Table
> val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE 
> word NOT IN (SELECT word FROM WordCount2) GROUP BY word")
> table.toDataSet[WC].print()
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693132#comment-15693132
 ] 

ASF GitHub Bot commented on FLINK-4541:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89461812
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSingleRowCross}
+
+class DataSetSingleRowCrossRule
+  extends ConverterRule(
+  classOf[LogicalJoin],
+  Convention.NONE,
+  DataSetConvention.INSTANCE,
+  "DataSetSingleRowCrossRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join = call.rel(0).asInstanceOf[LogicalJoin]
+
+if (isCrossJoin(join)) {
+  
isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) ||
+
isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+} else {
+  false
+}
+  }
+
+  private def isCrossJoin(join: LogicalJoin) = {
+val joinCondition = join.analyzeCondition
+joinCondition.isEqui && joinCondition.pairs().isEmpty
+  }
+
+  private def isGlobalAggregation(node: RelNode) = {
+node.isInstanceOf[LogicalAggregate] &&
+  isSingleLine(node.asInstanceOf[LogicalAggregate])
+  }
+
+  private def isSingleLine(agg: LogicalAggregate) = {
+agg.getGroupSets.size() == 1 &&
--- End diff --

I saw in a comment in Calcite that `groupSets` may be null. 
For safety, we should change the condition to `groupSets == null || 
(groupSets.size() == 1 && groupSets.get(0).isEmpty)`


> Support for SQL NOT IN operator
> ---
>
> Key: FLINK-4541
> URL: https://issues.apache.org/jira/browse/FLINK-4541
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Alexander Shoshin
>
> This should work:
> {code}
> def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> // register the DataSet as table "WordCount"
> tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
> tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, 
> 'frequency).select('word).filter('word !== "hello"))
> // run a SQL query on the Table and retrieve the result as a new Table
> val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE 
> word NOT IN (SELECT word FROM WordCount2) GROUP BY word")
> table.toDataSet[WC].print()
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693120#comment-15693120
 ] 

ASF GitHub Bot commented on FLINK-4541:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89483891
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/RichMapRunner.scala
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.slf4j.LoggerFactory
+
+class RichMapRunner[IN, OUT](
+name: String,
+code: String,
+@transient returnType: TypeInformation[OUT])
+  extends RichMapFunction[IN, OUT]
--- End diff --

extend from `RichFlatMapFunction`. `DataSetJoin` uses a `FlatJoinFunction` 
which emits records via a `Collector`. Wrapping this `FlatJoinFunction` will be 
easier from a `FlatMapFunction` than from a `MapFunction`.


> Support for SQL NOT IN operator
> ---
>
> Key: FLINK-4541
> URL: https://issues.apache.org/jira/browse/FLINK-4541
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Alexander Shoshin
>
> This should work:
> {code}
> def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> // register the DataSet as table "WordCount"
> tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
> tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, 
> 'frequency).select('word).filter('word !== "hello"))
> // run a SQL query on the Table and retrieve the result as a new Table
> val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE 
> word NOT IN (SELECT word FROM WordCount2) GROUP BY word")
> table.toDataSet[WC].print()
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89465992
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.RichMapRunner
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with CrossOperator.
+  */
+class DataSetSingleRowCross(
--- End diff --

Rename to `DataSetSingleRowJoin`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89483310
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.RichMapRunner
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with CrossOperator.
+  */
+class DataSetSingleRowCross(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
--- End diff --

Add the join condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693128#comment-15693128
 ] 

ASF GitHub Bot commented on FLINK-4541:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89466031
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSingleRowCross}
+
+class DataSetSingleRowCrossRule
--- End diff --

Rename to `DataSetSingleRowJoinRule`


> Support for SQL NOT IN operator
> ---
>
> Key: FLINK-4541
> URL: https://issues.apache.org/jira/browse/FLINK-4541
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Alexander Shoshin
>
> This should work:
> {code}
> def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> // register the DataSet as table "WordCount"
> tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
> tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, 
> 'frequency).select('word).filter('word !== "hello"))
> // run a SQL query on the Table and retrieve the result as a new Table
> val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE 
> word NOT IN (SELECT word FROM WordCount2) GROUP BY word")
> table.toDataSet[WC].print()
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693119#comment-15693119
 ] 

ASF GitHub Bot commented on FLINK-4541:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89486698
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSingleRowCross}
+
+class DataSetSingleRowCrossRule
+  extends ConverterRule(
+  classOf[LogicalJoin],
+  Convention.NONE,
+  DataSetConvention.INSTANCE,
+  "DataSetSingleRowCrossRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join = call.rel(0).asInstanceOf[LogicalJoin]
+
+if (isCrossJoin(join)) {
--- End diff --

We need to check that the `LogicalJoin.joinType` is `INNER`.


> Support for SQL NOT IN operator
> ---
>
> Key: FLINK-4541
> URL: https://issues.apache.org/jira/browse/FLINK-4541
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Alexander Shoshin
>
> This should work:
> {code}
> def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> // register the DataSet as table "WordCount"
> tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
> tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, 
> 'frequency).select('word).filter('word !== "hello"))
> // run a SQL query on the Table and retrieve the result as a new Table
> val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE 
> word NOT IN (SELECT word FROM WordCount2) GROUP BY word")
> table.toDataSet[WC].print()
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89483428
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/RichMapRunner.scala
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.slf4j.LoggerFactory
+
+class RichMapRunner[IN, OUT](
--- End diff --

I think we need two map join runner classes, one for each side of the 
broadcasted value. Could be named `MapJoinLeftRunner` and `MapJoinRightRunner`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693123#comment-15693123
 ] 

ASF GitHub Bot commented on FLINK-4541:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89465952
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.RichMapRunner
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with CrossOperator.
--- End diff --

Update to `Flink RelNode that executes a Join where one input is a single 
row.`


> Support for SQL NOT IN operator
> ---
>
> Key: FLINK-4541
> URL: https://issues.apache.org/jira/browse/FLINK-4541
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Alexander Shoshin
>
> This should work:
> {code}
> def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> // register the DataSet as table "WordCount"
> tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
> tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, 
> 'frequency).select('word).filter('word !== "hello"))
> // run a SQL query on the Table and retrieve the result as a new Table
> val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE 
> word NOT IN (SELECT word FROM WordCount2) GROUP BY word")
> table.toDataSet[WC].print()
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693118#comment-15693118
 ] 

ASF GitHub Bot commented on FLINK-4541:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89483366
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSingleRowCross}
+
+class DataSetSingleRowCrossRule
+  extends ConverterRule(
+  classOf[LogicalJoin],
+  Convention.NONE,
+  DataSetConvention.INSTANCE,
+  "DataSetSingleRowCrossRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join = call.rel(0).asInstanceOf[LogicalJoin]
+
+if (isCrossJoin(join)) {
+  
isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) ||
+
isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+} else {
+  false
+}
+  }
+
+  private def isCrossJoin(join: LogicalJoin) = {
+val joinCondition = join.analyzeCondition
+joinCondition.isEqui && joinCondition.pairs().isEmpty
+  }
+
+  private def isGlobalAggregation(node: RelNode) = {
+node.isInstanceOf[LogicalAggregate] &&
+  isSingleLine(node.asInstanceOf[LogicalAggregate])
+  }
+
+  private def isSingleLine(agg: LogicalAggregate) = {
+agg.getGroupSets.size() == 1 &&
+agg.getGroupSets.get(0).isEmpty &&
+agg.getGroupSet.isEmpty
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+val join = rel.asInstanceOf[LogicalJoin]
+val traitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+val dataSetLeftNode = RelOptRule.convert(join.getLeft, 
DataSetConvention.INSTANCE)
+val dataSetRightNode = RelOptRule.convert(join.getRight, 
DataSetConvention.INSTANCE)
+val leftIsSingle = 
isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+
+new DataSetSingleRowCross(
--- End diff --

Add the join condition.


> Support for SQL NOT IN operator
> ---
>
> Key: FLINK-4541
> URL: https://issues.apache.org/jira/browse/FLINK-4541
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Alexander Shoshin
>
> This should work:
> {code}
> def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> // register the DataSet as table "WordCount"
> tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
> tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, 
> 'frequency).select('word).filter('word !== "hello"))
> // run a SQL query on the Table and retrieve the result as a new Table
> val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE 
> word NOT IN (SELECT word FROM WordCount2) GROUP BY word")
> table.toDataSet[WC].print()
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693121#comment-15693121
 ] 

ASF GitHub Bot commented on FLINK-4541:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89480684
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.RichMapRunner
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with CrossOperator.
+  */
+class DataSetSingleRowCross(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinRowType: RelDataType,
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowCross(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinRowType,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  val rowCnt = metadata.getRowCount(child)
+  val rowSize = this.estimateRowSize(child.getRowType)
+  cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+val broadcastSetName = "joinSet"
+val mapSideJoin = generateMapFunction(
+  tableEnv.getConfig,
+  leftDataSet.getType,
+  rightDataSet.getType,
+  leftIsSingle,
+  broadcastSetName,
+  expectedType)
+
+val (multiRowDataSet, singleRowDataSet) =
+  if (leftIsSingle) {
+(rightDataSet, leftDataSet)
+  } else {
+(leftDataSet, rightDataSet)
+  }
+
+multiRowDataSet
+  .map(mapSideJoin)
+  .withBroadcastSet(singleRowDataSet, broadcastSetName)
+  .name(getMapOperatorName)

[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89483366
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSingleRowCross}
+
+class DataSetSingleRowCrossRule
+  extends ConverterRule(
+  classOf[LogicalJoin],
+  Convention.NONE,
+  DataSetConvention.INSTANCE,
+  "DataSetSingleRowCrossRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join = call.rel(0).asInstanceOf[LogicalJoin]
+
+if (isCrossJoin(join)) {
+  
isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) ||
+
isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+} else {
+  false
+}
+  }
+
+  private def isCrossJoin(join: LogicalJoin) = {
+val joinCondition = join.analyzeCondition
+joinCondition.isEqui && joinCondition.pairs().isEmpty
+  }
+
+  private def isGlobalAggregation(node: RelNode) = {
+node.isInstanceOf[LogicalAggregate] &&
+  isSingleLine(node.asInstanceOf[LogicalAggregate])
+  }
+
+  private def isSingleLine(agg: LogicalAggregate) = {
+agg.getGroupSets.size() == 1 &&
+agg.getGroupSets.get(0).isEmpty &&
+agg.getGroupSet.isEmpty
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+val join = rel.asInstanceOf[LogicalJoin]
+val traitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+val dataSetLeftNode = RelOptRule.convert(join.getLeft, 
DataSetConvention.INSTANCE)
+val dataSetRightNode = RelOptRule.convert(join.getRight, 
DataSetConvention.INSTANCE)
+val leftIsSingle = 
isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+
+new DataSetSingleRowCross(
--- End diff --

Add the join condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89462392
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/RichMapRunner.scala
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.slf4j.LoggerFactory
+
+class RichMapRunner[IN, OUT](
+name: String,
+code: String,
+@transient returnType: TypeInformation[OUT])
+  extends RichMapFunction[IN, OUT]
+  with ResultTypeQueryable[OUT]
+  with FunctionCompiler[RichMapFunction[IN, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: RichMapFunction[IN, OUT] = null
+
+  override def open(parameters: Configuration): Unit = {
--- End diff --

Can you also forward the `RichFunction.close()` call?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693127#comment-15693127
 ] 

ASF GitHub Bot commented on FLINK-4541:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89462392
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/RichMapRunner.scala
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.slf4j.LoggerFactory
+
+class RichMapRunner[IN, OUT](
+name: String,
+code: String,
+@transient returnType: TypeInformation[OUT])
+  extends RichMapFunction[IN, OUT]
+  with ResultTypeQueryable[OUT]
+  with FunctionCompiler[RichMapFunction[IN, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: RichMapFunction[IN, OUT] = null
+
+  override def open(parameters: Configuration): Unit = {
--- End diff --

Can you also forward the `RichFunction.close()` call?


> Support for SQL NOT IN operator
> ---
>
> Key: FLINK-4541
> URL: https://issues.apache.org/jira/browse/FLINK-4541
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Alexander Shoshin
>
> This should work:
> {code}
> def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> // register the DataSet as table "WordCount"
> tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
> tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, 
> 'frequency).select('word).filter('word !== "hello"))
> // run a SQL query on the Table and retrieve the result as a new Table
> val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE 
> word NOT IN (SELECT word FROM WordCount2) GROUP BY word")
> table.toDataSet[WC].print()
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693122#comment-15693122
 ] 

ASF GitHub Bot commented on FLINK-4541:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89461025
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSingleRowCross}
+
+class DataSetSingleRowCrossRule
+  extends ConverterRule(
+  classOf[LogicalJoin],
+  Convention.NONE,
+  DataSetConvention.INSTANCE,
+  "DataSetSingleRowCrossRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join = call.rel(0).asInstanceOf[LogicalJoin]
+
+if (isCrossJoin(join)) {
+  
isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) ||
+
isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+} else {
+  false
+}
+  }
+
+  private def isCrossJoin(join: LogicalJoin) = {
+val joinCondition = join.analyzeCondition
+joinCondition.isEqui && joinCondition.pairs().isEmpty
--- End diff --

Actually, I think we can remove the cross join condition completely and 
also execute equi-joins with a single-row input as Map-Broadcast join. This 
execution is very lightweight and should outperform sort and hash-based 
strategies used for equi-joins.

We need to evaluate the condition in the MapFunction though.


> Support for SQL NOT IN operator
> ---
>
> Key: FLINK-4541
> URL: https://issues.apache.org/jira/browse/FLINK-4541
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Alexander Shoshin
>
> This should work:
> {code}
> def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> // register the DataSet as table "WordCount"
> tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
> tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, 
> 'frequency).select('word).filter('word !== "hello"))
> // run a SQL query on the Table and retrieve the result as a new Table
> val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE 
> word NOT IN (SELECT word FROM WordCount2) GROUP BY word")
> table.toDataSet[WC].print()
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89480259
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.RichMapRunner
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with CrossOperator.
+  */
+class DataSetSingleRowCross(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinRowType: RelDataType,
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowCross(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinRowType,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
--- End diff --

I would compute the costs only for the large input. This way the costs are 
lower than the costs of `DataSetJoin` and the `DataSetSingleRowJoin` is 
preferred.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89482516
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.RichMapRunner
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with CrossOperator.
+  */
+class DataSetSingleRowCross(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinRowType: RelDataType,
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowCross(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinRowType,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
+  val rowCnt = metadata.getRowCount(child)
+  val rowSize = this.estimateRowSize(child.getRowType)
+  cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+}
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+val broadcastSetName = "joinSet"
+val mapSideJoin = generateMapFunction(
+  tableEnv.getConfig,
+  leftDataSet.getType,
+  rightDataSet.getType,
+  leftIsSingle,
+  broadcastSetName,
+  expectedType)
+
+val (multiRowDataSet, singleRowDataSet) =
+  if (leftIsSingle) {
+(rightDataSet, leftDataSet)
+  } else {
+(leftDataSet, rightDataSet)
+  }
+
+multiRowDataSet
+  .map(mapSideJoin)
+  .withBroadcastSet(singleRowDataSet, broadcastSetName)
+  .name(getMapOperatorName)
+  .asInstanceOf[DataSet[Any]]
+  }
+
+  private def generateMapFunction(
--- End diff --

Actually, I think it is easier and requires less additional code to 
generate a regular `JoinFunction` as done in the `DataSetJoin` a

[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89483891
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/RichMapRunner.scala
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.slf4j.LoggerFactory
+
+class RichMapRunner[IN, OUT](
+name: String,
+code: String,
+@transient returnType: TypeInformation[OUT])
+  extends RichMapFunction[IN, OUT]
--- End diff --

extend from `RichFlatMapFunction`. `DataSetJoin` uses a `FlatJoinFunction` 
which emits records via a `Collector`. Wrapping this `FlatJoinFunction` will be 
easier from a `FlatMapFunction` than from a `MapFunction`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693129#comment-15693129
 ] 

ASF GitHub Bot commented on FLINK-4541:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89480259
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.RichMapRunner
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with CrossOperator.
+  */
+class DataSetSingleRowCross(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+leftNode: RelNode,
+rightNode: RelNode,
+leftIsSingle: Boolean,
+rowRelDataType: RelDataType,
+joinRowType: RelDataType,
+ruleDescription: String)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataSetSingleRowCross(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  leftIsSingle,
+  getRowType,
+  joinRowType,
+  ruleDescription)
+  }
+
+  override def toString: String = {
+s"$joinTypeToString(where: ($joinConditionToString), join: 
($joinSelectionToString))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .item("where", joinConditionToString)
+  .item("join", joinSelectionToString)
+  .item("joinType", joinTypeToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+val children = this.getInputs
+children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, 
child) =>
--- End diff --

I would compute the costs only for the large input. This way the costs are 
lower than the costs of `DataSetJoin` and the `DataSetSingleRowJoin` is 
preferred.


> Support for SQL NOT IN operator
> ---
>
> Key: FLINK-4541
> URL: https://issues.apache.org/jira/browse/FLINK-4541
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Alexander Shoshin
>
> This should work:
> {code}
> def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> // register the DataSet as table "WordCount"
> tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
> tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, 
> 'frequency).select('word).filter('word !== "hello"))
> // run a SQL query on the Table and retrieve the result as a new Table
> va

[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89466031
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSingleRowCross}
+
+class DataSetSingleRowCrossRule
--- End diff --

Rename to `DataSetSingleRowJoinRule`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693130#comment-15693130
 ] 

ASF GitHub Bot commented on FLINK-4541:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89465992
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.RichMapRunner
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with CrossOperator.
+  */
+class DataSetSingleRowCross(
--- End diff --

Rename to `DataSetSingleRowJoin`


> Support for SQL NOT IN operator
> ---
>
> Key: FLINK-4541
> URL: https://issues.apache.org/jira/browse/FLINK-4541
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Alexander Shoshin
>
> This should work:
> {code}
> def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> // register the DataSet as table "WordCount"
> tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
> tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, 
> 'frequency).select('word).filter('word !== "hello"))
> // run a SQL query on the Table and retrieve the result as a new Table
> val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE 
> word NOT IN (SELECT word FROM WordCount2) GROUP BY word")
> table.toDataSet[WC].print()
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89461025
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin}
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSingleRowCross}
+
+class DataSetSingleRowCrossRule
+  extends ConverterRule(
+  classOf[LogicalJoin],
+  Convention.NONE,
+  DataSetConvention.INSTANCE,
+  "DataSetSingleRowCrossRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val join = call.rel(0).asInstanceOf[LogicalJoin]
+
+if (isCrossJoin(join)) {
+  
isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) ||
+
isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal)
+} else {
+  false
+}
+  }
+
+  private def isCrossJoin(join: LogicalJoin) = {
+val joinCondition = join.analyzeCondition
+joinCondition.isEqui && joinCondition.pairs().isEmpty
--- End diff --

Actually, I think we can remove the cross join condition completely and 
also execute equi-joins with a single-row input as Map-Broadcast join. This 
execution is very lightweight and should outperform sort and hash-based 
strategies used for equi-joins.

We need to evaluate the condition in the MapFunction though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator

2016-11-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2811#discussion_r89465952
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.RichMapRunner
+import 
org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with CrossOperator.
--- End diff --

Update to `Flink RelNode that executes a Join where one input is a single 
row.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-5154) Duplicate TypeSerializer when writing RocksDB Snapshot

2016-11-24 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-5154.
---
Resolution: Not A Problem

> Duplicate TypeSerializer when writing RocksDB Snapshot
> --
>
> Key: FLINK-5154
> URL: https://issues.apache.org/jira/browse/FLINK-5154
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.2.0, 1.1.4
>
>
> Some {{TypeSerializers}} are not thread safe (for example {{KryoSerializer}}) 
> we have to {{duplicate()}} them when using concurrently, as happens when 
> performing a RocksDB snapshot.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5155) Deprecate ValueStateDescriptor constructors with default value

2016-11-24 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-5155:

Description: 
Having the default value in the descriptor is problematic with some serialisers 
and we don't lose a feature because users can always check for the null value 
and initialise with their own default value if necessary. Right now, we're 
always forcing people to specify a default value even though they don't need 
one.

Of course, we should add constructors without a default value.

  was:Having the default value in the descriptor is problematic with some 
serialisers and we don't lose a feature because users can always check for the 
null value and initialise with their own default value if necessary. Right now, 
we're always forcing people to specify a default value even though they don't 
need one.


> Deprecate ValueStateDescriptor constructors with default value
> --
>
> Key: FLINK-5155
> URL: https://issues.apache.org/jira/browse/FLINK-5155
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Having the default value in the descriptor is problematic with some 
> serialisers and we don't lose a feature because users can always check for 
> the null value and initialise with their own default value if necessary. 
> Right now, we're always forcing people to specify a default value even though 
> they don't need one.
> Of course, we should add constructors without a default value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5155) Deprecate ValueStateDescriptor constructors with default value

2016-11-24 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5155:
---

 Summary: Deprecate ValueStateDescriptor constructors with default 
value
 Key: FLINK-5155
 URL: https://issues.apache.org/jira/browse/FLINK-5155
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: Aljoscha Krettek


Having the default value in the descriptor is problematic with some serialisers 
and we don't lose a feature because users can always check for the null value 
and initialise with their own default value if necessary. Right now, we're 
always forcing people to specify a default value even though they don't need 
one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2664: [FLINK-4861] [build] Package optional project artifacts

2016-11-24 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2664
  
I thought that transitive dependencies are resolved in the scope of 
assembly descriptors. But I'm not so sure about that anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4861) Package optional project artifacts

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693038#comment-15693038
 ] 

ASF GitHub Bot commented on FLINK-4861:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2664
  
I thought that transitive dependencies are resolved in the scope of 
assembly descriptors. But I'm not so sure about that anymore.


> Package optional project artifacts
> --
>
> Key: FLINK-4861
> URL: https://issues.apache.org/jira/browse/FLINK-4861
> Project: Flink
>  Issue Type: New Feature
>  Components: Build System
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> Per the mailing list 
> [discussion|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Additional-project-downloads-td13223.html],
>  package the Flink libraries and connectors into subdirectories of a new 
> {{opt}} directory in the release/snapshot tarballs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5154) Duplicate TypeSerializer when writing RocksDB Snapshot

2016-11-24 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5154:
---

 Summary: Duplicate TypeSerializer when writing RocksDB Snapshot
 Key: FLINK-5154
 URL: https://issues.apache.org/jira/browse/FLINK-5154
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Aljoscha Krettek
Assignee: Stefan Richter
Priority: Blocker
 Fix For: 1.2.0, 1.1.4


Some {{TypeSerializers}} are not thread safe (for example {{KryoSerializer}}) 
we have to {{duplicate()}} them when using concurrently, as happens when 
performing a RocksDB snapshot.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693016#comment-15693016
 ] 

ASF GitHub Bot commented on FLINK-5096:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2845
  
Thanks for the review @zentol ! I integrated your comments and rebased to 
the master.
Let's see what travis has to say and then merge it if  you have no further 
comments ;)


> Make the RollingSink rescalable.
> 
>
> Key: FLINK-5096
> URL: https://issues.apache.org/jira/browse/FLINK-5096
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Integrate the RollingSink with the new state abstractions so that its 
> parallelism can change after restoring from a savepoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2845: [FLINK-5096] Make the RollingSink rescalable.

2016-11-24 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2845
  
Thanks for the review @zentol ! I integrated your comments and rebased to 
the master.
Let's see what travis has to say and then merge it if  you have no further 
comments ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4565) Support for SQL IN operator

2016-11-24 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693008#comment-15693008
 ] 

Fabian Hueske commented on FLINK-4565:
--

Great! Looking forward to it.

> Support for SQL IN operator
> ---
>
> Key: FLINK-4565
> URL: https://issues.apache.org/jira/browse/FLINK-4565
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Nikolay Vasilishin
>
> It seems that Flink SQL supports the uncorrelated sub-query IN operator. But 
> it should also be available in the Table API and tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2856: Removed excessive tests.

2016-11-24 Thread chermenin
Github user chermenin commented on the issue:

https://github.com/apache/flink/pull/2856
  
I removed a method from ITCase only. There is `testJavaArraysAsList` method 
in `KryoCollectionsSerializerTest` class to check it. Yep, before merging 
#2623, they have not been successful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5075) Kinesis consumer incorrectly determines shards as newly discovered when tested against Kinesalite

2016-11-24 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15692971#comment-15692971
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5075:


Resolved for {{master}} via 
http://git-wip-us.apache.org/repos/asf/flink/commit/f5f4f7a
Resolved for {{release-1.1}} via 
http://git-wip-us.apache.org/repos/asf/flink/commit/b9e6dcc

> Kinesis consumer incorrectly determines shards as newly discovered when 
> tested against Kinesalite
> -
>
> Key: FLINK-5075
> URL: https://issues.apache.org/jira/browse/FLINK-5075
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0, 1.1.4
>
>
> A user reported that when our Kinesis connector is used against Kinesalite 
> (https://github.com/mhart/kinesalite), we're incorrectly determining already 
> found shards as newly discovered:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Subtask-keeps-on-discovering-new-Kinesis-shard-when-using-Kinesalite-td10133.html
> I suspect the problem to be the mock Kinesis API implementations of 
> Kinesalite doesn't completely match with the official AWS Kinesis behaviour.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5075) Kinesis consumer incorrectly determines shards as newly discovered when tested against Kinesalite

2016-11-24 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-5075:
---
Fix Version/s: 1.1.4

> Kinesis consumer incorrectly determines shards as newly discovered when 
> tested against Kinesalite
> -
>
> Key: FLINK-5075
> URL: https://issues.apache.org/jira/browse/FLINK-5075
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0, 1.1.4
>
>
> A user reported that when our Kinesis connector is used against Kinesalite 
> (https://github.com/mhart/kinesalite), we're incorrectly determining already 
> found shards as newly discovered:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Subtask-keeps-on-discovering-new-Kinesis-shard-when-using-Kinesalite-td10133.html
> I suspect the problem to be the mock Kinesis API implementations of 
> Kinesalite doesn't completely match with the official AWS Kinesis behaviour.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-5075) Kinesis consumer incorrectly determines shards as newly discovered when tested against Kinesalite

2016-11-24 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-5075.

Resolution: Fixed

> Kinesis consumer incorrectly determines shards as newly discovered when 
> tested against Kinesalite
> -
>
> Key: FLINK-5075
> URL: https://issues.apache.org/jira/browse/FLINK-5075
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0, 1.1.4
>
>
> A user reported that when our Kinesis connector is used against Kinesalite 
> (https://github.com/mhart/kinesalite), we're incorrectly determining already 
> found shards as newly discovered:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Subtask-keeps-on-discovering-new-Kinesis-shard-when-using-Kinesalite-td10133.html
> I suspect the problem to be the mock Kinesis API implementations of 
> Kinesalite doesn't completely match with the official AWS Kinesis behaviour.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5075) Kinesis consumer incorrectly determines shards as newly discovered when tested against Kinesalite

2016-11-24 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-5075:
---
Fix Version/s: 1.2.0

> Kinesis consumer incorrectly determines shards as newly discovered when 
> tested against Kinesalite
> -
>
> Key: FLINK-5075
> URL: https://issues.apache.org/jira/browse/FLINK-5075
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0, 1.1.4
>
>
> A user reported that when our Kinesis connector is used against Kinesalite 
> (https://github.com/mhart/kinesalite), we're incorrectly determining already 
> found shards as newly discovered:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Subtask-keeps-on-discovering-new-Kinesis-shard-when-using-Kinesalite-td10133.html
> I suspect the problem to be the mock Kinesis API implementations of 
> Kinesalite doesn't completely match with the official AWS Kinesis behaviour.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >