[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15695133#comment-15695133 ] Alexander Shoshin commented on FLINK-3133: -- [~mxm], hi. Can you answer my previous question please? It seems that you missed it ) > Introduce collect()/count()/print() methods in DataStream API > - > > Key: FLINK-3133 > URL: https://issues.apache.org/jira/browse/FLINK-3133 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Streaming >Affects Versions: 0.10.0, 1.0.0, 0.10.1 >Reporter: Maximilian Michels >Assignee: Alexander Shoshin > Fix For: 1.0.0 > > > The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should > be mirrored to the DataStream API. > The semantics of the calls are different. We need to be able to sample parts > of a stream, e.g. by supplying a time period in the arguments to the methods. > Users should use the {{JobClient}} to retrieve the results. > {code:java} > StreamExecutionEnvironment env = > StramEnvironment.getStreamExecutionEnvironment(); > DataStream streamData = env.addSource(..).map(..); > JobClient jobClient = env.executeWithControl(); > Iterable sampled = jobClient.sampleStream(streamData, > Time.seconds(5)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5002) Lack of synchronization in LocalBufferPool#getNumberOfUsedBuffers
[ https://issues.apache.org/jira/browse/FLINK-5002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15695114#comment-15695114 ] ASF GitHub Bot commented on FLINK-5002: --- GitHub user MayerRoman opened a pull request: https://github.com/apache/flink/pull/2865 [FLINK-5002] Renamed getNumberOfUsedBuffers() method to bestEffortGetNumOfUsedBuffers(), add a test to check that it does not return a negative value [FLINK-5002] Lack of synchronization in LocalBufferPool#getNumberOfUsedBuffers According to Stefan proposal, I renamed the method and added test to make sure that the method does not return a negative value. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MayerRoman/flink FLINK_5002 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2865.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2865 commit 5b7a17f54d37ac028335343742ba7021e047ca64 Author: Roman Maier Date: 2016-11-18T13:51:58Z [FLINK-5002] Renamed getNumberOfUsedBuffers() method to bestEffortGetNumOfUsedBuffers(), add a test to check that it does not return a negative value. > Lack of synchronization in LocalBufferPool#getNumberOfUsedBuffers > - > > Key: FLINK-5002 > URL: https://issues.apache.org/jira/browse/FLINK-5002 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Roman Maier >Priority: Minor > Labels: easyfix, starter > > {code} > public int getNumberOfUsedBuffers() { > return numberOfRequestedMemorySegments - availableMemorySegments.size(); > } > {code} > Access to availableMemorySegments should be protected with proper > synchronization as other methods do. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2865: [FLINK-5002] Renamed getNumberOfUsedBuffers() meth...
GitHub user MayerRoman opened a pull request: https://github.com/apache/flink/pull/2865 [FLINK-5002] Renamed getNumberOfUsedBuffers() method to bestEffortGetNumOfUsedBuffers(), add a test to check that it does not return a negative value [FLINK-5002] Lack of synchronization in LocalBufferPool#getNumberOfUsedBuffers According to Stefan proposal, I renamed the method and added test to make sure that the method does not return a negative value. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MayerRoman/flink FLINK_5002 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2865.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2865 commit 5b7a17f54d37ac028335343742ba7021e047ca64 Author: Roman Maier Date: 2016-11-18T13:51:58Z [FLINK-5002] Renamed getNumberOfUsedBuffers() method to bestEffortGetNumOfUsedBuffers(), add a test to check that it does not return a negative value. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-1536) Graph partitioning operators for Gelly
[ https://issues.apache.org/jira/browse/FLINK-1536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Mushketyk reassigned FLINK-1536: - Assignee: Ivan Mushketyk > Graph partitioning operators for Gelly > -- > > Key: FLINK-1536 > URL: https://issues.apache.org/jira/browse/FLINK-1536 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Vasia Kalavri >Assignee: Ivan Mushketyk >Priority: Minor > > Smart graph partitioning can significantly improve the performance and > scalability of graph analysis applications. Depending on the computation > pattern, a graph partitioning algorithm divides the graph into (maybe > overlapping) subgraphs, optimizing some objective. For example, if > communication is performed across graph edges, one might want to minimize the > edges that cross from one partition to another. > The problem of graph partitioning is a well studied problem and several > algorithms have been proposed in the literature. The goal of this project > would be to choose a few existing partitioning techniques and implement the > corresponding graph partitioning operators for Gelly. > Some related literature can be found [here| > http://www.citeulike.org/user/vasiakalavri/tag/graph-partitioning]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4649) Implement bipartite graph metrics
[ https://issues.apache.org/jira/browse/FLINK-4649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15694318#comment-15694318 ] Ivan Mushketyk commented on FLINK-4649: --- Implemented here: https://github.com/mushketyk/flink/tree/bipartite-metrics Blocked by this PR: https://github.com/apache/flink/pull/2564 > Implement bipartite graph metrics > - > > Key: FLINK-4649 > URL: https://issues.apache.org/jira/browse/FLINK-4649 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > Implement metrics calculation for a bipartite graph. Should be similar to > EdgeMetrics and VertexMetrics. > Paper that describes bipartite graph metrics: > http://jponnela.com/web_documents/twomode.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi resolved FLINK-3702. --- Resolution: Fixed Fix Version/s: 1.2.0 Fixed via 1f04542 and 870e219. > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > Fix For: 1.2.0 > > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15694237#comment-15694237 ] ASF GitHub Bot commented on FLINK-3702: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2094 > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2094: [FLINK-3702] Make FieldAccessors support nested fi...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2094 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2804: [FLINK-5067] Make Flink compile with 1.8 Java compiler
Github user melentye commented on the issue: https://github.com/apache/flink/pull/2804 @StephanEwen it seems that the semantics is different when java.version is just defined in pom.xml versus being overriden with -Djava.version=x.y.z argument passed to mvn. org.apache.hadoop.hbase.util.ClassSize expects a very specific format of java.version system property. But when java.version is only defined in pom.xml then it doesn't actually become a system property unlike in case of using -Djava.version --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5067) Make Flink compile with 1.8 Java compiler
[ https://issues.apache.org/jira/browse/FLINK-5067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693968#comment-15693968 ] ASF GitHub Bot commented on FLINK-5067: --- Github user melentye commented on the issue: https://github.com/apache/flink/pull/2804 @StephanEwen it seems that the semantics is different when java.version is just defined in pom.xml versus being overriden with -Djava.version=x.y.z argument passed to mvn. org.apache.hadoop.hbase.util.ClassSize expects a very specific format of java.version system property. But when java.version is only defined in pom.xml then it doesn't actually become a system property unlike in case of using -Djava.version > Make Flink compile with 1.8 Java compiler > - > > Key: FLINK-5067 > URL: https://issues.apache.org/jira/browse/FLINK-5067 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.2.0 > Environment: macOS Sierra 10.12.1, java version "1.8.0_112", Apache > Maven 3.3.9 >Reporter: Andrey Melentyev >Priority: Minor > > Flink fails to compile when using 1.8 as source and target in Maven. There > are two types of issue that are both related to the new type inference rules: > * Call to TypeSerializer.copy method in TupleSerializer.java:112 now resolves > to a different overload than before causing a compilation error: [ERROR] > /Users/andrey.melentyev/Dev/github.com/apache/flink/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java:[112,63] > incompatible types: void cannot be converted to java.lang.Object > * A number of unit tests using assertEquals fail to compile: > [ERROR] > /Users/andrey.melentyev/Dev/github.com/apache/flink/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java:[50,25] > reference to assertEquals is ambiguous > [ERROR] both method assertEquals(long,long) in org.junit.Assert and method > assertEquals(java.lang.Object,java.lang.Object) in org.junit.Assert match > In both of the above scenarios explicitly casting one of the arguments helps > the compiler to resolve overloaded method call correctly. > It is possible to maintain Flink's code base in a state when it can be built > by both 1.7 and 1.8. For this purpose we need minor code fixes and an > automated build in Travis to keep the new good state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4913) Per-job Yarn clusters: include user jar in system class loader
[ https://issues.apache.org/jira/browse/FLINK-4913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693942#comment-15693942 ] ASF GitHub Bot commented on FLINK-4913: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2795 Rebased and pushed to travis again: https://travis-ci.org/rmetzger/flink/builds/178664750 > Per-job Yarn clusters: include user jar in system class loader > --- > > Key: FLINK-4913 > URL: https://issues.apache.org/jira/browse/FLINK-4913 > Project: Flink > Issue Type: Improvement > Components: Client, YARN Client >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.2.0, 1.1.4 > > > Including the jar directly in the system classloader avoids loading it for > every instantiation of the ExecutionGraph and every Task execution. Note, > this is only possible for per-job clusters (i.e. Yarn/Mesos). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2795: Revert "[FLINK-4913][yarn] include user jars in system cl...
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2795 Rebased and pushed to travis again: https://travis-ci.org/rmetzger/flink/builds/178664750 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2856: Removed excessive tests.
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2856 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5055) Security feature crashes JM for certain Hadoop versions even though using no Kerberos
[ https://issues.apache.org/jira/browse/FLINK-5055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693816#comment-15693816 ] ASF GitHub Bot commented on FLINK-5055: --- GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2864 [FLINK-5055][security] skip Hadoop UGI login if unsecured The new Kerberos authentication code in Flink assumed that it's running against vanilla Hadoop. Original Hadoop's behavior is to skip a secure login if security is not configured. This is different for other distributions, e.g. the MapR Hadoop distribution of Hadoop. Thus, we need to make sure we don't perform any login action if security is not configured. This also performs minor code cleanup. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-5055 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2864.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2864 commit 8193024a6451dd2594348ac0f001ed39b80f7302 Author: Maximilian Michels Date: 2016-11-24T16:12:39Z [FLINK-5055][security] skip Hadoop UGI login if unsecured The new Kerberos authentication code in Flink assumed that it's running against vanilla Hadoop. Original Hadoop's behavior is to skip a secure login if security is not configured. This is different for other distributions, e.g. the MapR Hadoop distribution of Hadoop. Thus, we need to make sure we don't perform any login action if security is not configured. This also performs minor code cleanup. > Security feature crashes JM for certain Hadoop versions even though using no > Kerberos > - > > Key: FLINK-5055 > URL: https://issues.apache.org/jira/browse/FLINK-5055 > Project: Flink > Issue Type: Bug > Components: Security >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Maximilian Michels >Priority: Critical > Fix For: 1.2.0 > > > A user reported [1] that the {{JobManager}} does not start when using Flink > with Hadoop-2.7.0-mapr-1607 and no security activated because of > {code} > javax.security.auth.login.LoginException: Unable to obtain Principal Name for > authentication > at > com.sun.security.auth.module.Krb5LoginModule.promptForName(Krb5LoginModule.java:841) > at > com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:704) > at > com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617) > {code} > It seems that this Hadoop version always tries to login via Kerberos even > though the user did not activate it and, thus, should use > {{AuthenticationMode.SIMPLE}}. > I'm not really familiar with the security feature, but my understanding is > that it should not have any effect on Flink when not activated. I might be > wrong here, but if not, then we should fix this problem for 1.2.0 because it > prevents people from using Flink. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-using-Yarn-on-MapR-td14484.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2864: [FLINK-5055][security] skip Hadoop UGI login if un...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2864 [FLINK-5055][security] skip Hadoop UGI login if unsecured The new Kerberos authentication code in Flink assumed that it's running against vanilla Hadoop. Original Hadoop's behavior is to skip a secure login if security is not configured. This is different for other distributions, e.g. the MapR Hadoop distribution of Hadoop. Thus, we need to make sure we don't perform any login action if security is not configured. This also performs minor code cleanup. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-5055 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2864.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2864 commit 8193024a6451dd2594348ac0f001ed39b80f7302 Author: Maximilian Michels Date: 2016-11-24T16:12:39Z [FLINK-5055][security] skip Hadoop UGI login if unsecured The new Kerberos authentication code in Flink assumed that it's running against vanilla Hadoop. Original Hadoop's behavior is to skip a secure login if security is not configured. This is different for other distributions, e.g. the MapR Hadoop distribution of Hadoop. Thus, we need to make sure we don't perform any login action if security is not configured. This also performs minor code cleanup. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693706#comment-15693706 ] ASF GitHub Bot commented on FLINK-4541: --- Github user AlexanderShoshin commented on the issue: https://github.com/apache/flink/pull/2811 Sure, I will do it tomorrow :) > Support for SQL NOT IN operator > --- > > Key: FLINK-4541 > URL: https://issues.apache.org/jira/browse/FLINK-4541 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Alexander Shoshin > > This should work: > {code} > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > // register the DataSet as table "WordCount" > tEnv.registerDataSet("WordCount", input, 'word, 'frequency) > tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, > 'frequency).select('word).filter('word !== "hello")) > // run a SQL query on the Table and retrieve the result as a new Table > val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE > word NOT IN (SELECT word FROM WordCount2) GROUP BY word") > table.toDataSet[WC].print() > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user AlexanderShoshin commented on the issue: https://github.com/apache/flink/pull/2811 Sure, I will do it tomorrow :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5051) Backwards compatibility for serializers in backend state
[ https://issues.apache.org/jira/browse/FLINK-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693681#comment-15693681 ] ASF GitHub Bot commented on FLINK-5051: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2863 R @aljoscha and whoever is interested. > Backwards compatibility for serializers in backend state > > > Key: FLINK-5051 > URL: https://issues.apache.org/jira/browse/FLINK-5051 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Stefan Richter > > When a new state is register, e.g. in a keyed backend via > `getPartitionedState`, the caller has to provide all type serializers > required for the persistence of state components. Explicitly passing the > serializers on state creation already allows for potentiall version upgrades > of serializers. > However, those serializers are currently not part of any snapshot and are > only provided at runtime, when the state is registered newly or restored. For > backwards compatibility, this has strong implications: checkpoints are not > self contained in that state is currently a blackbox without knowledge about > it's corresponding serializers. Most cases where we would need to restructure > the state are basically lost. We could only convert them lazily at runtime > and only once the user is registering the concrete state, which might happen > at unpredictable points. > I suggest to adapt our solution as follows: > - As now, all states are registered with their set of serializers. > - Unlike now, all serializers are written to the snapshot. This makes > savepoints self-contained and also allows to create inspection tools for > savepoints at some point in the future. > - Introduce an interface {{Versioned}} with {{long getVersion()}} and > {{boolean isCompatible(Versioned v)}} which is then implemented by > serializers. Compatible serializers must ensure that they can deserialize > older versions, and can then serialize them in their new format. This is how > we upgrade. > We need to find the right tradeoff in how many places we need to store the > serializers. I suggest to write them once per parallel operator instance for > each state, i.e. we have a map with state_name -> tuple3, > serializer, serializer>. This could go before all > key-groups are written, right at the head of the file. Then, for each file we > see on restore, we can first read the serializer map from the head of the > stream, then go through the key groups by offset. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2863: [FLINK-5051] Backwards compatibility for serializers in b...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2863 R @aljoscha and whoever is interested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5051) Backwards compatibility for serializers in backend state
[ https://issues.apache.org/jira/browse/FLINK-5051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693679#comment-15693679 ] ASF GitHub Bot commented on FLINK-5051: --- GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/2863 [FLINK-5051] Backwards compatibility for serializers in backend state This PR sits on top of PR #2781 and introduces backwards compatibility for state serializers in keyed backends. We do so by providing version compatibility checking for ``TypeSerializer`` and making the serializers mandatory part of a keyed backend's meta data in checkpoints (so that we have everything required to reconstruct states in a self contained way). A serialization proxy is introduced for keyed backend state. Currently this serialization proxy covers the meta data, not yet the actual data. As the PR essentially moves functionality to a different place, it is already covered by existing tests. Notice: we should introduce a similar approach for ``OperatorStateBackend``s. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink serializer-backwards-compatibility Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2863.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2863 commit a373585c2fe71b467f49f0e295dc647b43ab7a9c Author: Stefan Richter Date: 2016-11-01T11:29:01Z Backwards compatibility 1.1 -> 1.2 commit 8e4e4bcede50e66a95928ec854e51d45a7df28bf Author: Stefan Richter Date: 2016-11-09T13:54:35Z Removing some unecessary code from migration classes commit 78bd66fade7f836eafbab978329caf1ea26f2ffc Author: Stefan Richter Date: 2016-11-09T17:21:13Z MultiStreamStateHandle commit a9355679c3476dd890b54312e1696b61c7839873 Author: Stefan Richter Date: 2016-11-10T13:18:55Z Added migration unit test commit d079bd4bdb762c307a3c5cd084590804b90996b1 Author: Stefan Richter Date: 2016-11-10T13:45:58Z rebase fixes commit 9f47bac9c25fc33993c3942a57462039cc578dcd Author: Stefan Richter Date: 2016-11-11T13:46:39Z Minor cleanups: deleting more unnecessary classes commit 2bbe66386d28c7914c62e2c3829ff3ab6840164c Author: Stefan Richter Date: 2016-11-23T13:15:33Z Versioned serialization commit 6460e27717ab208aada988ba2c83d5628b31b310 Author: Stefan Richter Date: 2016-11-23T17:59:45Z Common meta info introduced to keyed backends commit e7d66377730339523bad8e3e6e75865ea5a29a6b Author: Stefan Richter Date: 2016-11-23T21:40:26Z Introducing isCompatibleWith to TypeSerializers commit 89e3779d231fd0dadb01782791c92ec8ebb15a81 Author: Stefan Richter Date: 2016-11-23T22:33:42Z Splitting / Introducing interface for versiond and compatibile commit 6714a7efd3d839befda7a9b744311494e4ecb714 Author: Stefan Richter Date: 2016-11-24T10:59:01Z Cleanup and documentation commit 6df300f7f5a7d7b38b00ecd6636ecd53bc15d370 Author: Stefan Richter Date: 2016-11-24T11:18:43Z Cleanup and documentation commit b22273455d8f9282af715e502244811543e3fb99 Author: Stefan Richter Date: 2016-11-24T16:19:51Z Better abstraction > Backwards compatibility for serializers in backend state > > > Key: FLINK-5051 > URL: https://issues.apache.org/jira/browse/FLINK-5051 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Stefan Richter > > When a new state is register, e.g. in a keyed backend via > `getPartitionedState`, the caller has to provide all type serializers > required for the persistence of state components. Explicitly passing the > serializers on state creation already allows for potentiall version upgrades > of serializers. > However, those serializers are currently not part of any snapshot and are > only provided at runtime, when the state is registered newly or restored. For > backwards compatibility, this has strong implications: checkpoints are not > self contained in that state is currently a blackbox without knowledge about > it's corresponding serializers. Most cases where we would need to restructure > the state are basically lost. We could only convert them lazily at runtime > and only once the user is registering the concrete state, which might happen > at unpredictable points. > I suggest to adapt our solution as follows: > - As now, all states are registered with their set of serializers. > - Unlike now, all serializers are written to the snapshot. This makes > savepoints self-contained and also allows to create inspection tools for > savepoints at some point in the
[GitHub] flink pull request #2863: [FLINK-5051] Backwards compatibility for serialize...
GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/2863 [FLINK-5051] Backwards compatibility for serializers in backend state This PR sits on top of PR #2781 and introduces backwards compatibility for state serializers in keyed backends. We do so by providing version compatibility checking for ``TypeSerializer`` and making the serializers mandatory part of a keyed backend's meta data in checkpoints (so that we have everything required to reconstruct states in a self contained way). A serialization proxy is introduced for keyed backend state. Currently this serialization proxy covers the meta data, not yet the actual data. As the PR essentially moves functionality to a different place, it is already covered by existing tests. Notice: we should introduce a similar approach for ``OperatorStateBackend``s. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink serializer-backwards-compatibility Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2863.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2863 commit a373585c2fe71b467f49f0e295dc647b43ab7a9c Author: Stefan Richter Date: 2016-11-01T11:29:01Z Backwards compatibility 1.1 -> 1.2 commit 8e4e4bcede50e66a95928ec854e51d45a7df28bf Author: Stefan Richter Date: 2016-11-09T13:54:35Z Removing some unecessary code from migration classes commit 78bd66fade7f836eafbab978329caf1ea26f2ffc Author: Stefan Richter Date: 2016-11-09T17:21:13Z MultiStreamStateHandle commit a9355679c3476dd890b54312e1696b61c7839873 Author: Stefan Richter Date: 2016-11-10T13:18:55Z Added migration unit test commit d079bd4bdb762c307a3c5cd084590804b90996b1 Author: Stefan Richter Date: 2016-11-10T13:45:58Z rebase fixes commit 9f47bac9c25fc33993c3942a57462039cc578dcd Author: Stefan Richter Date: 2016-11-11T13:46:39Z Minor cleanups: deleting more unnecessary classes commit 2bbe66386d28c7914c62e2c3829ff3ab6840164c Author: Stefan Richter Date: 2016-11-23T13:15:33Z Versioned serialization commit 6460e27717ab208aada988ba2c83d5628b31b310 Author: Stefan Richter Date: 2016-11-23T17:59:45Z Common meta info introduced to keyed backends commit e7d66377730339523bad8e3e6e75865ea5a29a6b Author: Stefan Richter Date: 2016-11-23T21:40:26Z Introducing isCompatibleWith to TypeSerializers commit 89e3779d231fd0dadb01782791c92ec8ebb15a81 Author: Stefan Richter Date: 2016-11-23T22:33:42Z Splitting / Introducing interface for versiond and compatibile commit 6714a7efd3d839befda7a9b744311494e4ecb714 Author: Stefan Richter Date: 2016-11-24T10:59:01Z Cleanup and documentation commit 6df300f7f5a7d7b38b00ecd6636ecd53bc15d370 Author: Stefan Richter Date: 2016-11-24T11:18:43Z Cleanup and documentation commit b22273455d8f9282af715e502244811543e3fb99 Author: Stefan Richter Date: 2016-11-24T16:19:51Z Better abstraction --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693660#comment-15693660 ] ASF GitHub Bot commented on FLINK-4541: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2811 It's true, the requirements of this issue have evolved quite a bit and we should adapt the JIRA issue for that ;-) +1 for creating an issue to support arbitrary inner joins with a single row input and addressing that issue with this PR. Once that issue is resolved, we can revisit FLINK-4541 and see what's left to do. @AlexanderShoshin, can you open the JIRA issue and adapt this PR to it? > Support for SQL NOT IN operator > --- > > Key: FLINK-4541 > URL: https://issues.apache.org/jira/browse/FLINK-4541 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Alexander Shoshin > > This should work: > {code} > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > // register the DataSet as table "WordCount" > tEnv.registerDataSet("WordCount", input, 'word, 'frequency) > tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, > 'frequency).select('word).filter('word !== "hello")) > // run a SQL query on the Table and retrieve the result as a new Table > val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE > word NOT IN (SELECT word FROM WordCount2) GROUP BY word") > table.toDataSet[WC].print() > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2811 It's true, the requirements of this issue have evolved quite a bit and we should adapt the JIRA issue for that ;-) +1 for creating an issue to support arbitrary inner joins with a single row input and addressing that issue with this PR. Once that issue is resolved, we can revisit FLINK-4541 and see what's left to do. @AlexanderShoshin, can you open the JIRA issue and adapt this PR to it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2664: [FLINK-4861] [build] Package optional project artifacts
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2664 I had the same thought. We could add the maven assembly plugin / shade plugin to each connector / library to build a fat jar, and then add some logic to flink-dist to collect these fat jars into the final dist. I'm not sure how easy it is to pull build outputs from other modules into the dist module, but we are doing that for the examples as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4861) Package optional project artifacts
[ https://issues.apache.org/jira/browse/FLINK-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693654#comment-15693654 ] ASF GitHub Bot commented on FLINK-4861: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2664 I had the same thought. We could add the maven assembly plugin / shade plugin to each connector / library to build a fat jar, and then add some logic to flink-dist to collect these fat jars into the final dist. I'm not sure how easy it is to pull build outputs from other modules into the dist module, but we are doing that for the examples as well. > Package optional project artifacts > -- > > Key: FLINK-4861 > URL: https://issues.apache.org/jira/browse/FLINK-4861 > Project: Flink > Issue Type: New Feature > Components: Build System >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.2.0 > > > Per the mailing list > [discussion|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Additional-project-downloads-td13223.html], > package the Flink libraries and connectors into subdirectories of a new > {{opt}} directory in the release/snapshot tarballs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-5158) Handle ZooKeeperCompletedCheckpointStore exceptions in CheckpointCoordinator
[ https://issues.apache.org/jira/browse/FLINK-5158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-5158: Assignee: Till Rohrmann > Handle ZooKeeperCompletedCheckpointStore exceptions in CheckpointCoordinator > > > Key: FLINK-5158 > URL: https://issues.apache.org/jira/browse/FLINK-5158 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.1.4 > > > The checkpoint coordinator does not properly handle exceptions when trying to > store completed checkpoints. As a result, completed checkpoints are not > properly cleaned up and even worse, the {{CheckpointCoordinator}} might get > stuck stopping triggering checkpoints. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5158) Handle ZooKeeperCompletedCheckpointStore exceptions in CheckpointCoordinator
Till Rohrmann created FLINK-5158: Summary: Handle ZooKeeperCompletedCheckpointStore exceptions in CheckpointCoordinator Key: FLINK-5158 URL: https://issues.apache.org/jira/browse/FLINK-5158 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.1.3, 1.2.0 Reporter: Till Rohrmann Fix For: 1.2.0, 1.1.4 The checkpoint coordinator does not properly handle exceptions when trying to store completed checkpoints. As a result, completed checkpoints are not properly cleaned up and even worse, the {{CheckpointCoordinator}} might get stuck stopping triggering checkpoints. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user AlexanderShoshin commented on the issue: https://github.com/apache/flink/pull/2811 I think, that calling a JoinFunction inside a RichMapRunner make sence. I would also prefer not to touch the code generation if it's possible. But shouldn't we separate the support of all inner joins with a single row input from this "NOT IN" pull request? We might create a new jira issue to do this in another pull request. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693640#comment-15693640 ] ASF GitHub Bot commented on FLINK-4541: --- Github user AlexanderShoshin commented on the issue: https://github.com/apache/flink/pull/2811 I think, that calling a JoinFunction inside a RichMapRunner make sence. I would also prefer not to touch the code generation if it's possible. But shouldn't we separate the support of all inner joins with a single row input from this "NOT IN" pull request? We might create a new jira issue to do this in another pull request. What do you think? > Support for SQL NOT IN operator > --- > > Key: FLINK-4541 > URL: https://issues.apache.org/jira/browse/FLINK-4541 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Alexander Shoshin > > This should work: > {code} > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > // register the DataSet as table "WordCount" > tEnv.registerDataSet("WordCount", input, 'word, 'frequency) > tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, > 'frequency).select('word).filter('word !== "hello")) > // run a SQL query on the Table and retrieve the result as a new Table > val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE > word NOT IN (SELECT word FROM WordCount2) GROUP BY word") > table.toDataSet[WC].print() > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693565#comment-15693565 ] Ivan Mushketyk commented on FLINK-2254: --- Hi [~vkalavri] Thank you for your advice I've implemented simple metrics (will push them today) but as many other bipartite graph PR it is blocked by this PR: https://github.com/apache/flink/pull/2564 Regarding clustering coefficient. Do you want me to add another JIRA task for this? Should it address one type of clustering coefficient or should we have multiple types implemented? > Add Bipartite Graph Support for Gelly > - > > Key: FLINK-2254 > URL: https://issues.apache.org/jira/browse/FLINK-2254 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Assignee: Ivan Mushketyk > Labels: requires-design-doc > > A bipartite graph is a graph for which the set of vertices can be divided > into two disjoint sets such that each edge having a source vertex in the > first set, will have a target vertex in the second set. We would like to > support efficient operations for this type of graphs along with a set of > metrics(http://jponnela.com/web_documents/twomode.pdf). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-2254) Add Bipartite Graph Support for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693565#comment-15693565 ] Ivan Mushketyk edited comment on FLINK-2254 at 11/24/16 3:40 PM: - Hi [~vkalavri] Thank you for your advice I've implemented simple metrics (will push them today) but as many other bipartite graph PRs it is blocked by this PR: https://github.com/apache/flink/pull/2564 Regarding clustering coefficient. Do you want me to add another JIRA task for this? Should it address one type of clustering coefficient or should we have multiple types implemented? was (Author: ivan.mushketyk): Hi [~vkalavri] Thank you for your advice I've implemented simple metrics (will push them today) but as many other bipartite graph PR it is blocked by this PR: https://github.com/apache/flink/pull/2564 Regarding clustering coefficient. Do you want me to add another JIRA task for this? Should it address one type of clustering coefficient or should we have multiple types implemented? > Add Bipartite Graph Support for Gelly > - > > Key: FLINK-2254 > URL: https://issues.apache.org/jira/browse/FLINK-2254 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Assignee: Ivan Mushketyk > Labels: requires-design-doc > > A bipartite graph is a graph for which the set of vertices can be divided > into two disjoint sets such that each edge having a source vertex in the > first set, will have a target vertex in the second set. We would like to > support efficient operations for this type of graphs along with a set of > metrics(http://jponnela.com/web_documents/twomode.pdf). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4741) WebRuntimeMonitor does not shut down all of it's threads (EventLoopGroups) on exit.
[ https://issues.apache.org/jira/browse/FLINK-4741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693546#comment-15693546 ] ASF GitHub Bot commented on FLINK-4741: --- GitHub user MayerRoman opened a pull request: https://github.com/apache/flink/pull/2862 [FLINK-4741] Fix for the proper shutdown the ServerBootstrap in the process of stopping the WebRuntumeMonitor [FLINK-4741] WebRuntimeMonitor does not shut down all of it's threads (EventLoopGroups) on exit. bootstrap.childGroup().shutdownGracefully() method has been added to the correct shutdown WebRuntimeMonitor. More detailed explanation: bootrstrap - it is io.netty.bootstrap.ServerBootstrap - a helper class that sets up a netty-server. In its work netty-server uses two EventLoopGroups: The first one, often called 'boss', accepts an incoming connection. The second one, often called 'worker', handles the traffic of the accepted connection once the boss accepts the connection and registers the accepted connection to the worker. At the end of netty-server work should be shut down all the threads in each of the two EventLoopGroups. First EventLoopGroups stops when called bootstrap.group().shutdownGracefully(). To stop the second EventLoopGroups this request adds call of bootstrap.chiledGroup().shutdownGracefully() method. This proposal corresponds to netty user guide: http://netty.io/wiki/user-guide-for-4.x.html You can merge this pull request into a Git repository by running: $ git pull https://github.com/MayerRoman/flink FLINK-4741 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2862.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2862 commit 0d43c9a92e806e2c663d61b8db5098622474bf0e Author: Roman Maier Date: 2016-11-24T12:49:00Z [FLINK-4741] Fix for the proper shutdown the ServerBootstrap in the process of stopping the WebRuntumeMonitor > WebRuntimeMonitor does not shut down all of it's threads (EventLoopGroups) on > exit. > --- > > Key: FLINK-4741 > URL: https://issues.apache.org/jira/browse/FLINK-4741 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.1.2 >Reporter: Joseph Sims >Assignee: Roman Maier >Priority: Minor > > WebRuntimeMonitor does not shutdown correctly, causing the overall > application to hang on shutdown. It shuts down bootstrap.group > (EventLoopGroup) but not the bootstrap.childGroup (EventLoopGroup). > If WebRuntimeMonitor is not used (local.start-webserver=false), this problem > does not occur. > Class: WebRuntimeMonitor > method: stop() > Line: ~387 > Called: > bootstrap.group().shutdownGracefully() > Not called: > bootstrap.childGroup().shutdownGracefully() -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2862: [FLINK-4741] Fix for the proper shutdown the Serve...
GitHub user MayerRoman opened a pull request: https://github.com/apache/flink/pull/2862 [FLINK-4741] Fix for the proper shutdown the ServerBootstrap in the process of stopping the WebRuntumeMonitor [FLINK-4741] WebRuntimeMonitor does not shut down all of it's threads (EventLoopGroups) on exit. bootstrap.childGroup().shutdownGracefully() method has been added to the correct shutdown WebRuntimeMonitor. More detailed explanation: bootrstrap - it is io.netty.bootstrap.ServerBootstrap - a helper class that sets up a netty-server. In its work netty-server uses two EventLoopGroups: The first one, often called 'boss', accepts an incoming connection. The second one, often called 'worker', handles the traffic of the accepted connection once the boss accepts the connection and registers the accepted connection to the worker. At the end of netty-server work should be shut down all the threads in each of the two EventLoopGroups. First EventLoopGroups stops when called bootstrap.group().shutdownGracefully(). To stop the second EventLoopGroups this request adds call of bootstrap.chiledGroup().shutdownGracefully() method. This proposal corresponds to netty user guide: http://netty.io/wiki/user-guide-for-4.x.html You can merge this pull request into a Git repository by running: $ git pull https://github.com/MayerRoman/flink FLINK-4741 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2862.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2862 commit 0d43c9a92e806e2c663d61b8db5098622474bf0e Author: Roman Maier Date: 2016-11-24T12:49:00Z [FLINK-4741] Fix for the proper shutdown the ServerBootstrap in the process of stopping the WebRuntumeMonitor --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693506#comment-15693506 ] ASF GitHub Bot commented on FLINK-2254: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2564 @vasia I don't think anybody is shepherding this PR :) > Add Bipartite Graph Support for Gelly > - > > Key: FLINK-2254 > URL: https://issues.apache.org/jira/browse/FLINK-2254 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 0.10.0 >Reporter: Andra Lungu >Assignee: Ivan Mushketyk > Labels: requires-design-doc > > A bipartite graph is a graph for which the set of vertices can be divided > into two disjoint sets such that each edge having a source vertex in the > first set, will have a target vertex in the second set. We would like to > support efficient operations for this type of graphs along with a set of > metrics(http://jponnela.com/web_documents/twomode.pdf). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2564: [FLINK-2254] Add BipartiateGraph class
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2564 @vasia I don't think anybody is shepherding this PR :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2664: [FLINK-4861] [build] Package optional project artifacts
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2664 How about just building a fat jar for each connector / library? That way it becomes quite easy for users - they simply refer to one jar. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4861) Package optional project artifacts
[ https://issues.apache.org/jira/browse/FLINK-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693446#comment-15693446 ] ASF GitHub Bot commented on FLINK-4861: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2664 How about just building a fat jar for each connector / library? That way it becomes quite easy for users - they simply refer to one jar. > Package optional project artifacts > -- > > Key: FLINK-4861 > URL: https://issues.apache.org/jira/browse/FLINK-4861 > Project: Flink > Issue Type: New Feature > Components: Build System >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.2.0 > > > Per the mailing list > [discussion|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Additional-project-downloads-td13223.html], > package the Flink libraries and connectors into subdirectories of a new > {{opt}} directory in the release/snapshot tarballs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5157) Extending AllWindow Function Metadata
Ventura Del Monte created FLINK-5157: Summary: Extending AllWindow Function Metadata Key: FLINK-5157 URL: https://issues.apache.org/jira/browse/FLINK-5157 Project: Flink Issue Type: New Feature Components: DataStream API, Streaming, Windowing Operators Reporter: Ventura Del Monte Assignee: Ventura Del Monte Fix For: 1.2.0 Following the logic behind [1,2], ProcessAllWindowFunction can be introduced in Flink and AllWindowedStream can be extended in order to support them. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata [2] https://issues.apache.org/jira/browse/FLINK-4997 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2856: Removed excessive tests.
Github user chermenin commented on the issue: https://github.com/apache/flink/pull/2856 Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4712) Implementing ranking predictions for ALS
[ https://issues.apache.org/jira/browse/FLINK-4712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693381#comment-15693381 ] ASF GitHub Bot commented on FLINK-4712: --- Github user gaborhermann commented on a diff in the pull request: https://github.com/apache/flink/pull/2838#discussion_r89503899 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala --- @@ -72,14 +77,142 @@ trait Predictor[Self] extends Estimator[Self] with WithParameters { */ def evaluate[Testing, PredictionValue]( testing: DataSet[Testing], - evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit - evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue]) + evaluateParameters: ParameterMap = ParameterMap.Empty) + (implicit evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue]) : DataSet[(PredictionValue, PredictionValue)] = { FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment) evaluator.evaluateDataSet(this, evaluateParameters, testing) } } +trait RankingPredictor[Self] extends Estimator[Self] with WithParameters { + that: Self => + + def predictRankings( +k: Int, +users: DataSet[Int], +predictParameters: ParameterMap = ParameterMap.Empty)(implicit +rankingPredictOperation : RankingPredictOperation[Self]) + : DataSet[(Int,Int,Int)] = +rankingPredictOperation.predictRankings(this, k, users, predictParameters) + + def evaluateRankings( +testing: DataSet[(Int,Int,Double)], +evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit +rankingPredictOperation : RankingPredictOperation[Self]) + : DataSet[(Int,Int,Int)] = { +// todo: do not burn 100 topK into code +predictRankings(100, testing.map(_._1).distinct(), evaluateParameters) + } +} + +trait RankingPredictOperation[Instance] { + def predictRankings( +instance: Instance, +k: Int, +users: DataSet[Int], +predictParameters: ParameterMap = ParameterMap.Empty) + : DataSet[(Int, Int, Int)] +} + +/** + * Trait for providing auxiliary data for ranking evaluations. + * + * They are useful e.g. for excluding items found in the training [[DataSet]] + * from the recommended top K items. + */ +trait TrainingRatingsProvider { + + def getTrainingData: DataSet[(Int, Int, Double)] + + /** +* Retrieving the training items. +* Although this can be calculated from the training data, it requires a costly +* [[DataSet.distinct]] operation, while in matrix factor models the set items could be +* given more efficiently from the item factors. +*/ + def getTrainingItems: DataSet[Int] = { +getTrainingData.map(_._2).distinct() + } +} + +/** + * Ranking predictions for the most common case. + * If we can predict ratings, we can compute top K lists by sorting the predicted ratings. + */ +class RankingFromRatingPredictOperation[Instance <: TrainingRatingsProvider] +(val ratingPredictor: PredictDataSetOperation[Instance, (Int, Int), (Int, Int, Double)]) + extends RankingPredictOperation[Instance] { + + private def getUserItemPairs(users: DataSet[Int], items: DataSet[Int], exclude: DataSet[(Int, Int)]) + : DataSet[(Int, Int)] = { +users.cross(items) --- End diff -- I completely agree. There are use-cases where we would not like to give rankings from all the items. E.g. when recommending TV programs, we would only like to recommend currently running TV programs, but train on all of them. We'll include an `item` DataSet parameter to ranking predictions. (Btw. I believe the "Flink-way" is to let the user configure as much as possible, but that's just my opinion :) ) > Implementing ranking predictions for ALS > > > Key: FLINK-4712 > URL: https://issues.apache.org/jira/browse/FLINK-4712 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Domokos Miklós Kelen >Assignee: Gábor Hermann > > We started working on implementing ranking predictions for recommender > systems. Ranking prediction means that beside predicting scores for user-item > pairs, the recommender system is able to recommend a top K list for the users. > Details: > In practice, this would mean finding the K items for a particular user with > the highest predicted rating. It should be possible also to specify whether > to exclude the already seen items from a particular user's toplist. (See for > ex
[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...
Github user gaborhermann commented on a diff in the pull request: https://github.com/apache/flink/pull/2838#discussion_r89503899 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala --- @@ -72,14 +77,142 @@ trait Predictor[Self] extends Estimator[Self] with WithParameters { */ def evaluate[Testing, PredictionValue]( testing: DataSet[Testing], - evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit - evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue]) + evaluateParameters: ParameterMap = ParameterMap.Empty) + (implicit evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue]) : DataSet[(PredictionValue, PredictionValue)] = { FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment) evaluator.evaluateDataSet(this, evaluateParameters, testing) } } +trait RankingPredictor[Self] extends Estimator[Self] with WithParameters { + that: Self => + + def predictRankings( +k: Int, +users: DataSet[Int], +predictParameters: ParameterMap = ParameterMap.Empty)(implicit +rankingPredictOperation : RankingPredictOperation[Self]) + : DataSet[(Int,Int,Int)] = +rankingPredictOperation.predictRankings(this, k, users, predictParameters) + + def evaluateRankings( +testing: DataSet[(Int,Int,Double)], +evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit +rankingPredictOperation : RankingPredictOperation[Self]) + : DataSet[(Int,Int,Int)] = { +// todo: do not burn 100 topK into code +predictRankings(100, testing.map(_._1).distinct(), evaluateParameters) + } +} + +trait RankingPredictOperation[Instance] { + def predictRankings( +instance: Instance, +k: Int, +users: DataSet[Int], +predictParameters: ParameterMap = ParameterMap.Empty) + : DataSet[(Int, Int, Int)] +} + +/** + * Trait for providing auxiliary data for ranking evaluations. + * + * They are useful e.g. for excluding items found in the training [[DataSet]] + * from the recommended top K items. + */ +trait TrainingRatingsProvider { + + def getTrainingData: DataSet[(Int, Int, Double)] + + /** +* Retrieving the training items. +* Although this can be calculated from the training data, it requires a costly +* [[DataSet.distinct]] operation, while in matrix factor models the set items could be +* given more efficiently from the item factors. +*/ + def getTrainingItems: DataSet[Int] = { +getTrainingData.map(_._2).distinct() + } +} + +/** + * Ranking predictions for the most common case. + * If we can predict ratings, we can compute top K lists by sorting the predicted ratings. + */ +class RankingFromRatingPredictOperation[Instance <: TrainingRatingsProvider] +(val ratingPredictor: PredictDataSetOperation[Instance, (Int, Int), (Int, Int, Double)]) + extends RankingPredictOperation[Instance] { + + private def getUserItemPairs(users: DataSet[Int], items: DataSet[Int], exclude: DataSet[(Int, Int)]) + : DataSet[(Int, Int)] = { +users.cross(items) --- End diff -- I completely agree. There are use-cases where we would not like to give rankings from all the items. E.g. when recommending TV programs, we would only like to recommend currently running TV programs, but train on all of them. We'll include an `item` DataSet parameter to ranking predictions. (Btw. I believe the "Flink-way" is to let the user configure as much as possible, but that's just my opinion :) ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4450) update storm version to 1.0.0
[ https://issues.apache.org/jira/browse/FLINK-4450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693361#comment-15693361 ] ASF GitHub Bot commented on FLINK-4450: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2439 Thanks! You can squash commits by rebasing. Have a look here: http://gitready.com/advanced/2009/02/10/squashing-commits-with-rebase.html > update storm version to 1.0.0 > - > > Key: FLINK-4450 > URL: https://issues.apache.org/jira/browse/FLINK-4450 > Project: Flink > Issue Type: Improvement > Components: flink-contrib >Reporter: yuzhongliu > Fix For: 2.0.0 > > > The storm package path was changed in new version > storm old version package: > backtype.storm.* > storm new version pachage: > org.apache.storm.* > shall we update flink/flink-storm code to new storm version? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5055) Security feature crashes JM for certain Hadoop versions even though using no Kerberos
[ https://issues.apache.org/jira/browse/FLINK-5055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693362#comment-15693362 ] Maximilian Michels commented on FLINK-5055: --- That seems like a logical explanation. However, Till and me were able to reproduce the problem with a standard Hadoop config (security turned off). I'm looking into the cause of the problem now. > Security feature crashes JM for certain Hadoop versions even though using no > Kerberos > - > > Key: FLINK-5055 > URL: https://issues.apache.org/jira/browse/FLINK-5055 > Project: Flink > Issue Type: Bug > Components: Security >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Maximilian Michels >Priority: Critical > Fix For: 1.2.0 > > > A user reported [1] that the {{JobManager}} does not start when using Flink > with Hadoop-2.7.0-mapr-1607 and no security activated because of > {code} > javax.security.auth.login.LoginException: Unable to obtain Principal Name for > authentication > at > com.sun.security.auth.module.Krb5LoginModule.promptForName(Krb5LoginModule.java:841) > at > com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:704) > at > com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617) > {code} > It seems that this Hadoop version always tries to login via Kerberos even > though the user did not activate it and, thus, should use > {{AuthenticationMode.SIMPLE}}. > I'm not really familiar with the security feature, but my understanding is > that it should not have any effect on Flink when not activated. I might be > wrong here, but if not, then we should fix this problem for 1.2.0 because it > prevents people from using Flink. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-using-Yarn-on-MapR-td14484.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4712) Implementing ranking predictions for ALS
[ https://issues.apache.org/jira/browse/FLINK-4712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693360#comment-15693360 ] ASF GitHub Bot commented on FLINK-4712: --- Github user gaborhermann commented on the issue: https://github.com/apache/flink/pull/2838 Thanks again for taking a look at our PR! I've just realized from a developer mailing list thread that the FlinkML API is still not carved into stone even until 2.0, and it's nice to hear that :) The problem is not with the `evaluate(test: TestType): DataSet[Double]` but rather with `evaluate(test: TestType): DataSet[(Prediction,Prediction)]`. It's at least confusing to have both, but it might not be worth to expose the one giving `(Prediction,Prediction)` pairs to the user as it only *prepares* evaluation. With introducing the evaluation framework, we could at least rename it to something like `preparePairwiseEvaluation(test: TestType): DataSet[(Prediction,Prediction)]`. In the ranking case we might generalize it to `prepareEvaluation(test: TestType): PreparedTesting`. We basically did this with the `PrepareDataSetOperation`, we've just left the old `evaluate` as it is for now. I suggest to change this if we can break the API. I'll do a rebase on the cross-validation PR. At first glance, it should not really be a problem to do both cross-validation and hyper-parameter tuning, as the user has to provide a `Scorer` anyway. A minor issue I see is the user falling back to a default `score` (e.g. RMSE in case of ALS). This might not be a problem for recommendation models that give rating predictions beside ranking predictions, but it's a problem for models that *only* give ranking predictions, because those do not extend the `Predictor` class. This is not an issue for now, but might be a problem when adding more recommendation models. Should we try and do this now or is it a bit "overengineering"? I'll see if any other problem comes up with after rebasing. The `RankingPredictor` interface is useful *internally* for the `Score`s. It serves a contract between a `RankingScore` and the model. I'm sure it will be used only for recommendations, but it's no effort exposing it, so the user can write code using a general `RankingPredictor` (although I would not think this is what users would like to do :) ). A better question is whether to use it in a `Pipeline`. We discussed this with some people, and could not really find a use-case where we need a `Transformer`-like preprocessing for recommendations. Of course, there could be other preprocessing steps, such as removing/aggregating duplicates, but those do not have to be `fit` to training data. Based on this, it's not worth the effort to integrate `RankingPredictor` with the `Pipeline`, at least for now. > Implementing ranking predictions for ALS > > > Key: FLINK-4712 > URL: https://issues.apache.org/jira/browse/FLINK-4712 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Domokos Miklós Kelen >Assignee: Gábor Hermann > > We started working on implementing ranking predictions for recommender > systems. Ranking prediction means that beside predicting scores for user-item > pairs, the recommender system is able to recommend a top K list for the users. > Details: > In practice, this would mean finding the K items for a particular user with > the highest predicted rating. It should be possible also to specify whether > to exclude the already seen items from a particular user's toplist. (See for > example the 'exclude_known' setting of [Graphlab Create's ranking > factorization > recommender|https://turi.com/products/create/docs/generated/graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend.html#graphlab.recommender.ranking_factorization_recommender.RankingFactorizationRecommender.recommend] > ). > The output of the topK recommendation function could be in the form of > {{DataSet[(Int,Int,Int)]}}, meaning (user, item, rank), similar to Graphlab > Create's output. However, this is arguable: follow up work includes > implementing ranking recommendation evaluation metrics (such as precision@k, > recall@k, ndcg@k), similar to [Spark's > implementations|https://spark.apache.org/docs/1.5.0/mllib-evaluation-metrics.html#ranking-systems]. > It would be beneficial if we were able to design the API such that it could > be included in the proposed evaluation framework (see > [5157|https://issues.apache.org/jira/browse/FLINK-2157]), which makes it > neccessary to consider the possible output type {{DataSet[(Int, > Array[Int])]}} or {{DataSet[(Int, Array[(Int,Double)])]}} meaning (user, > array of items), possibly including the predicted s
[GitHub] flink issue #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendation & e...
Github user gaborhermann commented on the issue: https://github.com/apache/flink/pull/2838 Thanks again for taking a look at our PR! I've just realized from a developer mailing list thread that the FlinkML API is still not carved into stone even until 2.0, and it's nice to hear that :) The problem is not with the `evaluate(test: TestType): DataSet[Double]` but rather with `evaluate(test: TestType): DataSet[(Prediction,Prediction)]`. It's at least confusing to have both, but it might not be worth to expose the one giving `(Prediction,Prediction)` pairs to the user as it only *prepares* evaluation. With introducing the evaluation framework, we could at least rename it to something like `preparePairwiseEvaluation(test: TestType): DataSet[(Prediction,Prediction)]`. In the ranking case we might generalize it to `prepareEvaluation(test: TestType): PreparedTesting`. We basically did this with the `PrepareDataSetOperation`, we've just left the old `evaluate` as it is for now. I suggest to change this if we can break the API. I'll do a rebase on the cross-validation PR. At first glance, it should not really be a problem to do both cross-validation and hyper-parameter tuning, as the user has to provide a `Scorer` anyway. A minor issue I see is the user falling back to a default `score` (e.g. RMSE in case of ALS). This might not be a problem for recommendation models that give rating predictions beside ranking predictions, but it's a problem for models that *only* give ranking predictions, because those do not extend the `Predictor` class. This is not an issue for now, but might be a problem when adding more recommendation models. Should we try and do this now or is it a bit "overengineering"? I'll see if any other problem comes up with after rebasing. The `RankingPredictor` interface is useful *internally* for the `Score`s. It serves a contract between a `RankingScore` and the model. I'm sure it will be used only for recommendations, but it's no effort exposing it, so the user can write code using a general `RankingPredictor` (although I would not think this is what users would like to do :) ). A better question is whether to use it in a `Pipeline`. We discussed this with some people, and could not really find a use-case where we need a `Transformer`-like preprocessing for recommendations. Of course, there could be other preprocessing steps, such as removing/aggregating duplicates, but those do not have to be `fit` to training data. Based on this, it's not worth the effort to integrate `RankingPredictor` with the `Pipeline`, at least for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2439: [FLINK-4450]update storm verion to 1.0.0 in flink-storm a...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2439 Thanks! You can squash commits by rebasing. Have a look here: http://gitready.com/advanced/2009/02/10/squashing-commits-with-rebase.html --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-5055) Security feature crashes JM for certain Hadoop versions even though using no Kerberos
[ https://issues.apache.org/jira/browse/FLINK-5055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels reassigned FLINK-5055: - Assignee: Maximilian Michels > Security feature crashes JM for certain Hadoop versions even though using no > Kerberos > - > > Key: FLINK-5055 > URL: https://issues.apache.org/jira/browse/FLINK-5055 > Project: Flink > Issue Type: Bug > Components: Security >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Maximilian Michels >Priority: Critical > Fix For: 1.2.0 > > > A user reported [1] that the {{JobManager}} does not start when using Flink > with Hadoop-2.7.0-mapr-1607 and no security activated because of > {code} > javax.security.auth.login.LoginException: Unable to obtain Principal Name for > authentication > at > com.sun.security.auth.module.Krb5LoginModule.promptForName(Krb5LoginModule.java:841) > at > com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:704) > at > com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617) > {code} > It seems that this Hadoop version always tries to login via Kerberos even > though the user did not activate it and, thus, should use > {{AuthenticationMode.SIMPLE}}. > I'm not really familiar with the security feature, but my understanding is > that it should not have any effect on Flink when not activated. I might be > wrong here, but if not, then we should fix this problem for 1.2.0 because it > prevents people from using Flink. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-using-Yarn-on-MapR-td14484.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4450) update storm version to 1.0.0
[ https://issues.apache.org/jira/browse/FLINK-4450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693353#comment-15693353 ] ASF GitHub Bot commented on FLINK-4450: --- Github user liuyuzhong commented on the issue: https://github.com/apache/flink/pull/2439 Complie error is fixed. But how to squash all commits into one commit? > update storm version to 1.0.0 > - > > Key: FLINK-4450 > URL: https://issues.apache.org/jira/browse/FLINK-4450 > Project: Flink > Issue Type: Improvement > Components: flink-contrib >Reporter: yuzhongliu > Fix For: 2.0.0 > > > The storm package path was changed in new version > storm old version package: > backtype.storm.* > storm new version pachage: > org.apache.storm.* > shall we update flink/flink-storm code to new storm version? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2439: [FLINK-4450]update storm verion to 1.0.0 in flink-storm a...
Github user liuyuzhong commented on the issue: https://github.com/apache/flink/pull/2439 Complie error is fixed. But how to squash all commits into one commit? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5156) Consolidate streaming FieldAccessor functionality
Márton Balassi created FLINK-5156: - Summary: Consolidate streaming FieldAccessor functionality Key: FLINK-5156 URL: https://issues.apache.org/jira/browse/FLINK-5156 Project: Flink Issue Type: Task Reporter: Márton Balassi The streaming FieldAccessors (keyedStream.keyBy(...)) have slightly different semantics compared to their batch counterparts. Currently the streaming ones allow selecting a field within an array (which might be dangerous as the array typeinfo does not contain the length of the array, thus leading to a potential index out of bounds) and accept not only "*", but also "0" to select a whole type. This functionality should be either removed or documented. The latter can be achieved by effectively reverting [1]. Note that said commit was squashed before merging. [1] https://github.com/mbalassi/flink/commit/237f07eb113508703c980b14587d66970e7f6251 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4997) Extending Window Function Metadata
[ https://issues.apache.org/jira/browse/FLINK-4997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693232#comment-15693232 ] ASF GitHub Bot commented on FLINK-4997: --- Github user VenturaDelMonte commented on the issue: https://github.com/apache/flink/pull/2756 @manuzhang thank you for your feedback and no problem! @aljoscha 👍 > Extending Window Function Metadata > -- > > Key: FLINK-4997 > URL: https://issues.apache.org/jira/browse/FLINK-4997 > Project: Flink > Issue Type: New Feature > Components: DataStream API, Streaming, Windowing Operators >Reporter: Ventura Del Monte >Assignee: Ventura Del Monte > Fix For: 1.2.0 > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2756: [FLINK-4997] Extending Window Function Metadata
Github user VenturaDelMonte commented on the issue: https://github.com/apache/flink/pull/2756 @manuzhang thank you for your feedback and no problem! @aljoscha ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
[ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693171#comment-15693171 ] ASF GitHub Bot commented on FLINK-3702: --- Github user mbalassi commented on the issue: https://github.com/apache/flink/pull/2094 Merging... > DataStream API PojoFieldAccessor doesn't support nested POJOs > - > > Key: FLINK-3702 > URL: https://issues.apache.org/jira/browse/FLINK-3702 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Gabor Gevay > > The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar > methods) doesn't support nested POJOs right now. > As part of FLINK-3697 I'll add a check for a nested POJO and fail with an > exception. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2094: [FLINK-3702] Make FieldAccessors support nested field exp...
Github user mbalassi commented on the issue: https://github.com/apache/flink/pull/2094 Merging... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4712) Implementing ranking predictions for ALS
[ https://issues.apache.org/jira/browse/FLINK-4712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693161#comment-15693161 ] ASF GitHub Bot commented on FLINK-4712: --- Github user gaborhermann commented on a diff in the pull request: https://github.com/apache/flink/pull/2838#discussion_r89489117 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala --- @@ -72,14 +77,142 @@ trait Predictor[Self] extends Estimator[Self] with WithParameters { */ def evaluate[Testing, PredictionValue]( testing: DataSet[Testing], - evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit - evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue]) + evaluateParameters: ParameterMap = ParameterMap.Empty) + (implicit evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue]) : DataSet[(PredictionValue, PredictionValue)] = { FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment) evaluator.evaluateDataSet(this, evaluateParameters, testing) } } +trait RankingPredictor[Self] extends Estimator[Self] with WithParameters { + that: Self => + + def predictRankings( +k: Int, +users: DataSet[Int], +predictParameters: ParameterMap = ParameterMap.Empty)(implicit +rankingPredictOperation : RankingPredictOperation[Self]) + : DataSet[(Int,Int,Int)] = +rankingPredictOperation.predictRankings(this, k, users, predictParameters) + + def evaluateRankings( +testing: DataSet[(Int,Int,Double)], +evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit +rankingPredictOperation : RankingPredictOperation[Self]) + : DataSet[(Int,Int,Int)] = { +// todo: do not burn 100 topK into code +predictRankings(100, testing.map(_._1).distinct(), evaluateParameters) + } +} + +trait RankingPredictOperation[Instance] { + def predictRankings( +instance: Instance, +k: Int, +users: DataSet[Int], +predictParameters: ParameterMap = ParameterMap.Empty) + : DataSet[(Int, Int, Int)] +} + +/** + * Trait for providing auxiliary data for ranking evaluations. + * + * They are useful e.g. for excluding items found in the training [[DataSet]] + * from the recommended top K items. + */ +trait TrainingRatingsProvider { + + def getTrainingData: DataSet[(Int, Int, Double)] + + /** +* Retrieving the training items. +* Although this can be calculated from the training data, it requires a costly +* [[DataSet.distinct]] operation, while in matrix factor models the set items could be +* given more efficiently from the item factors. +*/ + def getTrainingItems: DataSet[Int] = { +getTrainingData.map(_._2).distinct() + } +} + +/** + * Ranking predictions for the most common case. + * If we can predict ratings, we can compute top K lists by sorting the predicted ratings. + */ +class RankingFromRatingPredictOperation[Instance <: TrainingRatingsProvider] +(val ratingPredictor: PredictDataSetOperation[Instance, (Int, Int), (Int, Int, Double)]) + extends RankingPredictOperation[Instance] { + + private def getUserItemPairs(users: DataSet[Int], items: DataSet[Int], exclude: DataSet[(Int, Int)]) + : DataSet[(Int, Int)] = { +users.cross(items) --- End diff -- You're right. Although there's not much we can do generally to avoid this, we might be able to optimize for matrix factorization. This solution works for *every* predictor that predicts ratings, and we currently use it in ALS ([here](https://github.com/apache/flink/pull/2838/files/45c98a97ef82d1012062dbcf6ade85a8d566062d#diff-80639a21b8fd166b5f7df5280cd609a9R467)). With a matrix factorization model *specifically*, we can avoid materializing all user-item pairs as tuples, and compute the rankings more directly, and that might be more efficient. So we could use a more specific `RankingPredictor` implementation in `ALS`. But even in that case, we still need to go through all the items for a particular user to calculate the top k items for that user. Also this is only calculated with for the users we'd like to give rankings to. E.g. in a testing scenario, for the users in the test data which might be significantly less than the users in the training data. I suggest to keep this anyway as this is general. We might come up with a solution that's slightly efficient in most cases for MF models. Should put effort in working on it? What do you think? > Implementing ranking predictions for ALS > > > Key: FLINK
[GitHub] flink pull request #2838: [FLINK-4712] [FLINK-4713] [ml] Ranking recommendat...
Github user gaborhermann commented on a diff in the pull request: https://github.com/apache/flink/pull/2838#discussion_r89489117 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala --- @@ -72,14 +77,142 @@ trait Predictor[Self] extends Estimator[Self] with WithParameters { */ def evaluate[Testing, PredictionValue]( testing: DataSet[Testing], - evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit - evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue]) + evaluateParameters: ParameterMap = ParameterMap.Empty) + (implicit evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue]) : DataSet[(PredictionValue, PredictionValue)] = { FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment) evaluator.evaluateDataSet(this, evaluateParameters, testing) } } +trait RankingPredictor[Self] extends Estimator[Self] with WithParameters { + that: Self => + + def predictRankings( +k: Int, +users: DataSet[Int], +predictParameters: ParameterMap = ParameterMap.Empty)(implicit +rankingPredictOperation : RankingPredictOperation[Self]) + : DataSet[(Int,Int,Int)] = +rankingPredictOperation.predictRankings(this, k, users, predictParameters) + + def evaluateRankings( +testing: DataSet[(Int,Int,Double)], +evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit +rankingPredictOperation : RankingPredictOperation[Self]) + : DataSet[(Int,Int,Int)] = { +// todo: do not burn 100 topK into code +predictRankings(100, testing.map(_._1).distinct(), evaluateParameters) + } +} + +trait RankingPredictOperation[Instance] { + def predictRankings( +instance: Instance, +k: Int, +users: DataSet[Int], +predictParameters: ParameterMap = ParameterMap.Empty) + : DataSet[(Int, Int, Int)] +} + +/** + * Trait for providing auxiliary data for ranking evaluations. + * + * They are useful e.g. for excluding items found in the training [[DataSet]] + * from the recommended top K items. + */ +trait TrainingRatingsProvider { + + def getTrainingData: DataSet[(Int, Int, Double)] + + /** +* Retrieving the training items. +* Although this can be calculated from the training data, it requires a costly +* [[DataSet.distinct]] operation, while in matrix factor models the set items could be +* given more efficiently from the item factors. +*/ + def getTrainingItems: DataSet[Int] = { +getTrainingData.map(_._2).distinct() + } +} + +/** + * Ranking predictions for the most common case. + * If we can predict ratings, we can compute top K lists by sorting the predicted ratings. + */ +class RankingFromRatingPredictOperation[Instance <: TrainingRatingsProvider] +(val ratingPredictor: PredictDataSetOperation[Instance, (Int, Int), (Int, Int, Double)]) + extends RankingPredictOperation[Instance] { + + private def getUserItemPairs(users: DataSet[Int], items: DataSet[Int], exclude: DataSet[(Int, Int)]) + : DataSet[(Int, Int)] = { +users.cross(items) --- End diff -- You're right. Although there's not much we can do generally to avoid this, we might be able to optimize for matrix factorization. This solution works for *every* predictor that predicts ratings, and we currently use it in ALS ([here](https://github.com/apache/flink/pull/2838/files/45c98a97ef82d1012062dbcf6ade85a8d566062d#diff-80639a21b8fd166b5f7df5280cd609a9R467)). With a matrix factorization model *specifically*, we can avoid materializing all user-item pairs as tuples, and compute the rankings more directly, and that might be more efficient. So we could use a more specific `RankingPredictor` implementation in `ALS`. But even in that case, we still need to go through all the items for a particular user to calculate the top k items for that user. Also this is only calculated with for the users we'd like to give rankings to. E.g. in a testing scenario, for the users in the test data which might be significantly less than the users in the training data. I suggest to keep this anyway as this is general. We might come up with a solution that's slightly efficient in most cases for MF models. Should put effort in working on it? What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693124#comment-15693124 ] ASF GitHub Bot commented on FLINK-4541: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89482516 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.RichMapRunner +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode which matches along with CrossOperator. + */ +class DataSetSingleRowCross( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinRowType: RelDataType, +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetSingleRowCross( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + leftIsSingle, + getRowType, + joinRowType, + ruleDescription) + } + + override def toString: String = { +s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("where", joinConditionToString) + .item("join", joinSelectionToString) + .item("joinType", joinTypeToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val children = this.getInputs +children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, child) => + val rowCnt = metadata.getRowCount(child) + val rowSize = this.estimateRowSize(child.getRowType) + cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) +} + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) +val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) +val broadcastSetName = "joinSet" +val mapSideJoin = generateMapFunction( + tableEnv.getConfig, + leftDataSet.getType, + rightDataSet.getType, + leftIsSingle, + broadcastSetName, + expectedType) + +val (multiRowDataSet, singleRowDataSet) = + if (leftIsSingle) { +(rightDataSet, leftDataSet) + } else { +(leftDataSet, rightDataSet) + } + +multiRowDataSet + .map(mapSideJoin) + .withBroadcastSet(singleRowDataSet, broadcastSetName) + .name(getMapOperatorName)
[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89461812 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin} +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSingleRowCross} + +class DataSetSingleRowCrossRule + extends ConverterRule( + classOf[LogicalJoin], + Convention.NONE, + DataSetConvention.INSTANCE, + "DataSetSingleRowCrossRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join = call.rel(0).asInstanceOf[LogicalJoin] + +if (isCrossJoin(join)) { + isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) || + isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal) +} else { + false +} + } + + private def isCrossJoin(join: LogicalJoin) = { +val joinCondition = join.analyzeCondition +joinCondition.isEqui && joinCondition.pairs().isEmpty + } + + private def isGlobalAggregation(node: RelNode) = { +node.isInstanceOf[LogicalAggregate] && + isSingleLine(node.asInstanceOf[LogicalAggregate]) + } + + private def isSingleLine(agg: LogicalAggregate) = { +agg.getGroupSets.size() == 1 && --- End diff -- I saw in a comment in Calcite that `groupSets` may be null. For safety, we should change the condition to `groupSets == null || (groupSets.size() == 1 && groupSets.get(0).isEmpty)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693131#comment-15693131 ] ASF GitHub Bot commented on FLINK-4541: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89483428 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/RichMapRunner.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.slf4j.LoggerFactory + +class RichMapRunner[IN, OUT]( --- End diff -- I think we need two map join runner classes, one for each side of the broadcasted value. Could be named `MapJoinLeftRunner` and `MapJoinRightRunner`. > Support for SQL NOT IN operator > --- > > Key: FLINK-4541 > URL: https://issues.apache.org/jira/browse/FLINK-4541 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Alexander Shoshin > > This should work: > {code} > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > // register the DataSet as table "WordCount" > tEnv.registerDataSet("WordCount", input, 'word, 'frequency) > tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, > 'frequency).select('word).filter('word !== "hello")) > // run a SQL query on the Table and retrieve the result as a new Table > val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE > word NOT IN (SELECT word FROM WordCount2) GROUP BY word") > table.toDataSet[WC].print() > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89486698 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin} +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSingleRowCross} + +class DataSetSingleRowCrossRule + extends ConverterRule( + classOf[LogicalJoin], + Convention.NONE, + DataSetConvention.INSTANCE, + "DataSetSingleRowCrossRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join = call.rel(0).asInstanceOf[LogicalJoin] + +if (isCrossJoin(join)) { --- End diff -- We need to check that the `LogicalJoin.joinType` is `INNER`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693125#comment-15693125 ] ASF GitHub Bot commented on FLINK-4541: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89480767 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.RichMapRunner +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode which matches along with CrossOperator. + */ +class DataSetSingleRowCross( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinRowType: RelDataType, +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetSingleRowCross( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + leftIsSingle, + getRowType, + joinRowType, + ruleDescription) + } + + override def toString: String = { +s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("where", joinConditionToString) + .item("join", joinSelectionToString) + .item("joinType", joinTypeToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val children = this.getInputs +children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, child) => + val rowCnt = metadata.getRowCount(child) + val rowSize = this.estimateRowSize(child.getRowType) + cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) +} + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) +val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) +val broadcastSetName = "joinSet" +val mapSideJoin = generateMapFunction( + tableEnv.getConfig, + leftDataSet.getType, + rightDataSet.getType, + leftIsSingle, + broadcastSetName, + expectedType) + +val (multiRowDataSet, singleRowDataSet) = + if (leftIsSingle) { +(rightDataSet, leftDataSet) + } else { +(leftDataSet, rightDataSet) + } + +multiRowDataSet + .map(mapSideJoin) + .withBroadcastSet(singleRowDataSet, broadcastSetName) + .name(getMapOperatorName)
[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89480684 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.RichMapRunner +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode which matches along with CrossOperator. + */ +class DataSetSingleRowCross( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinRowType: RelDataType, +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetSingleRowCross( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + leftIsSingle, + getRowType, + joinRowType, + ruleDescription) + } + + override def toString: String = { +s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("where", joinConditionToString) + .item("join", joinSelectionToString) + .item("joinType", joinTypeToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val children = this.getInputs +children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, child) => + val rowCnt = metadata.getRowCount(child) + val rowSize = this.estimateRowSize(child.getRowType) + cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) +} + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) +val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) +val broadcastSetName = "joinSet" +val mapSideJoin = generateMapFunction( + tableEnv.getConfig, + leftDataSet.getType, + rightDataSet.getType, + leftIsSingle, + broadcastSetName, + expectedType) + +val (multiRowDataSet, singleRowDataSet) = + if (leftIsSingle) { +(rightDataSet, leftDataSet) + } else { +(leftDataSet, rightDataSet) + } + +multiRowDataSet + .map(mapSideJoin) + .withBroadcastSet(singleRowDataSet, broadcastSetName) + .name(getMapOperatorName) + .asInstanceOf[DataSet[Any]] + } + + private def generateMapFunction( --- End diff -- Can we include the join condition here as well? --- If your project is set up for it, you can reply to this email and have your rep
[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89480767 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.RichMapRunner +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode which matches along with CrossOperator. + */ +class DataSetSingleRowCross( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinRowType: RelDataType, +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetSingleRowCross( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + leftIsSingle, + getRowType, + joinRowType, + ruleDescription) + } + + override def toString: String = { +s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("where", joinConditionToString) + .item("join", joinSelectionToString) + .item("joinType", joinTypeToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val children = this.getInputs +children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, child) => + val rowCnt = metadata.getRowCount(child) + val rowSize = this.estimateRowSize(child.getRowType) + cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) +} + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) +val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) +val broadcastSetName = "joinSet" +val mapSideJoin = generateMapFunction( + tableEnv.getConfig, + leftDataSet.getType, + rightDataSet.getType, + leftIsSingle, + broadcastSetName, + expectedType) + +val (multiRowDataSet, singleRowDataSet) = + if (leftIsSingle) { +(rightDataSet, leftDataSet) + } else { +(leftDataSet, rightDataSet) + } + +multiRowDataSet + .map(mapSideJoin) + .withBroadcastSet(singleRowDataSet, broadcastSetName) + .name(getMapOperatorName) + .asInstanceOf[DataSet[Any]] + } + + private def generateMapFunction( + config: TableConfig, + inputType1: TypeInformation[Any], + inputType2: TypeInformation[Any], + firstIsSingle: Boolean, +
[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693126#comment-15693126 ] ASF GitHub Bot commented on FLINK-4541: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89483310 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.RichMapRunner +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode which matches along with CrossOperator. + */ +class DataSetSingleRowCross( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, --- End diff -- Add the join condition. > Support for SQL NOT IN operator > --- > > Key: FLINK-4541 > URL: https://issues.apache.org/jira/browse/FLINK-4541 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Alexander Shoshin > > This should work: > {code} > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > // register the DataSet as table "WordCount" > tEnv.registerDataSet("WordCount", input, 'word, 'frequency) > tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, > 'frequency).select('word).filter('word !== "hello")) > // run a SQL query on the Table and retrieve the result as a new Table > val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE > word NOT IN (SELECT word FROM WordCount2) GROUP BY word") > table.toDataSet[WC].print() > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693132#comment-15693132 ] ASF GitHub Bot commented on FLINK-4541: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89461812 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin} +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSingleRowCross} + +class DataSetSingleRowCrossRule + extends ConverterRule( + classOf[LogicalJoin], + Convention.NONE, + DataSetConvention.INSTANCE, + "DataSetSingleRowCrossRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join = call.rel(0).asInstanceOf[LogicalJoin] + +if (isCrossJoin(join)) { + isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) || + isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal) +} else { + false +} + } + + private def isCrossJoin(join: LogicalJoin) = { +val joinCondition = join.analyzeCondition +joinCondition.isEqui && joinCondition.pairs().isEmpty + } + + private def isGlobalAggregation(node: RelNode) = { +node.isInstanceOf[LogicalAggregate] && + isSingleLine(node.asInstanceOf[LogicalAggregate]) + } + + private def isSingleLine(agg: LogicalAggregate) = { +agg.getGroupSets.size() == 1 && --- End diff -- I saw in a comment in Calcite that `groupSets` may be null. For safety, we should change the condition to `groupSets == null || (groupSets.size() == 1 && groupSets.get(0).isEmpty)` > Support for SQL NOT IN operator > --- > > Key: FLINK-4541 > URL: https://issues.apache.org/jira/browse/FLINK-4541 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Alexander Shoshin > > This should work: > {code} > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > // register the DataSet as table "WordCount" > tEnv.registerDataSet("WordCount", input, 'word, 'frequency) > tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, > 'frequency).select('word).filter('word !== "hello")) > // run a SQL query on the Table and retrieve the result as a new Table > val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE > word NOT IN (SELECT word FROM WordCount2) GROUP BY word") > table.toDataSet[WC].print() > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693120#comment-15693120 ] ASF GitHub Bot commented on FLINK-4541: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89483891 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/RichMapRunner.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.slf4j.LoggerFactory + +class RichMapRunner[IN, OUT]( +name: String, +code: String, +@transient returnType: TypeInformation[OUT]) + extends RichMapFunction[IN, OUT] --- End diff -- extend from `RichFlatMapFunction`. `DataSetJoin` uses a `FlatJoinFunction` which emits records via a `Collector`. Wrapping this `FlatJoinFunction` will be easier from a `FlatMapFunction` than from a `MapFunction`. > Support for SQL NOT IN operator > --- > > Key: FLINK-4541 > URL: https://issues.apache.org/jira/browse/FLINK-4541 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Alexander Shoshin > > This should work: > {code} > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > // register the DataSet as table "WordCount" > tEnv.registerDataSet("WordCount", input, 'word, 'frequency) > tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, > 'frequency).select('word).filter('word !== "hello")) > // run a SQL query on the Table and retrieve the result as a new Table > val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE > word NOT IN (SELECT word FROM WordCount2) GROUP BY word") > table.toDataSet[WC].print() > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89465992 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.RichMapRunner +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode which matches along with CrossOperator. + */ +class DataSetSingleRowCross( --- End diff -- Rename to `DataSetSingleRowJoin` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89483310 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.RichMapRunner +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode which matches along with CrossOperator. + */ +class DataSetSingleRowCross( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, --- End diff -- Add the join condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693128#comment-15693128 ] ASF GitHub Bot commented on FLINK-4541: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89466031 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin} +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSingleRowCross} + +class DataSetSingleRowCrossRule --- End diff -- Rename to `DataSetSingleRowJoinRule` > Support for SQL NOT IN operator > --- > > Key: FLINK-4541 > URL: https://issues.apache.org/jira/browse/FLINK-4541 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Alexander Shoshin > > This should work: > {code} > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > // register the DataSet as table "WordCount" > tEnv.registerDataSet("WordCount", input, 'word, 'frequency) > tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, > 'frequency).select('word).filter('word !== "hello")) > // run a SQL query on the Table and retrieve the result as a new Table > val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE > word NOT IN (SELECT word FROM WordCount2) GROUP BY word") > table.toDataSet[WC].print() > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693119#comment-15693119 ] ASF GitHub Bot commented on FLINK-4541: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89486698 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin} +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSingleRowCross} + +class DataSetSingleRowCrossRule + extends ConverterRule( + classOf[LogicalJoin], + Convention.NONE, + DataSetConvention.INSTANCE, + "DataSetSingleRowCrossRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join = call.rel(0).asInstanceOf[LogicalJoin] + +if (isCrossJoin(join)) { --- End diff -- We need to check that the `LogicalJoin.joinType` is `INNER`. > Support for SQL NOT IN operator > --- > > Key: FLINK-4541 > URL: https://issues.apache.org/jira/browse/FLINK-4541 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Alexander Shoshin > > This should work: > {code} > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > // register the DataSet as table "WordCount" > tEnv.registerDataSet("WordCount", input, 'word, 'frequency) > tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, > 'frequency).select('word).filter('word !== "hello")) > // run a SQL query on the Table and retrieve the result as a new Table > val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE > word NOT IN (SELECT word FROM WordCount2) GROUP BY word") > table.toDataSet[WC].print() > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89483428 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/RichMapRunner.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.slf4j.LoggerFactory + +class RichMapRunner[IN, OUT]( --- End diff -- I think we need two map join runner classes, one for each side of the broadcasted value. Could be named `MapJoinLeftRunner` and `MapJoinRightRunner`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693123#comment-15693123 ] ASF GitHub Bot commented on FLINK-4541: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89465952 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.RichMapRunner +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode which matches along with CrossOperator. --- End diff -- Update to `Flink RelNode that executes a Join where one input is a single row.` > Support for SQL NOT IN operator > --- > > Key: FLINK-4541 > URL: https://issues.apache.org/jira/browse/FLINK-4541 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Alexander Shoshin > > This should work: > {code} > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > // register the DataSet as table "WordCount" > tEnv.registerDataSet("WordCount", input, 'word, 'frequency) > tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, > 'frequency).select('word).filter('word !== "hello")) > // run a SQL query on the Table and retrieve the result as a new Table > val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE > word NOT IN (SELECT word FROM WordCount2) GROUP BY word") > table.toDataSet[WC].print() > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693118#comment-15693118 ] ASF GitHub Bot commented on FLINK-4541: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89483366 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin} +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSingleRowCross} + +class DataSetSingleRowCrossRule + extends ConverterRule( + classOf[LogicalJoin], + Convention.NONE, + DataSetConvention.INSTANCE, + "DataSetSingleRowCrossRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join = call.rel(0).asInstanceOf[LogicalJoin] + +if (isCrossJoin(join)) { + isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) || + isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal) +} else { + false +} + } + + private def isCrossJoin(join: LogicalJoin) = { +val joinCondition = join.analyzeCondition +joinCondition.isEqui && joinCondition.pairs().isEmpty + } + + private def isGlobalAggregation(node: RelNode) = { +node.isInstanceOf[LogicalAggregate] && + isSingleLine(node.asInstanceOf[LogicalAggregate]) + } + + private def isSingleLine(agg: LogicalAggregate) = { +agg.getGroupSets.size() == 1 && +agg.getGroupSets.get(0).isEmpty && +agg.getGroupSet.isEmpty + } + + override def convert(rel: RelNode): RelNode = { +val join = rel.asInstanceOf[LogicalJoin] +val traitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) +val dataSetLeftNode = RelOptRule.convert(join.getLeft, DataSetConvention.INSTANCE) +val dataSetRightNode = RelOptRule.convert(join.getRight, DataSetConvention.INSTANCE) +val leftIsSingle = isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal) + +new DataSetSingleRowCross( --- End diff -- Add the join condition. > Support for SQL NOT IN operator > --- > > Key: FLINK-4541 > URL: https://issues.apache.org/jira/browse/FLINK-4541 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Alexander Shoshin > > This should work: > {code} > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > // register the DataSet as table "WordCount" > tEnv.registerDataSet("WordCount", input, 'word, 'frequency) > tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, > 'frequency).select('word).filter('word !== "hello")) > // run a SQL query on the Table and retrieve the result as a new Table > val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE > word NOT IN (SELECT word FROM WordCount2) GROUP BY word") > table.toDataSet[WC].print() > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693121#comment-15693121 ] ASF GitHub Bot commented on FLINK-4541: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89480684 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.RichMapRunner +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode which matches along with CrossOperator. + */ +class DataSetSingleRowCross( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinRowType: RelDataType, +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetSingleRowCross( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + leftIsSingle, + getRowType, + joinRowType, + ruleDescription) + } + + override def toString: String = { +s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("where", joinConditionToString) + .item("join", joinSelectionToString) + .item("joinType", joinTypeToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val children = this.getInputs +children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, child) => + val rowCnt = metadata.getRowCount(child) + val rowSize = this.estimateRowSize(child.getRowType) + cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) +} + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) +val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) +val broadcastSetName = "joinSet" +val mapSideJoin = generateMapFunction( + tableEnv.getConfig, + leftDataSet.getType, + rightDataSet.getType, + leftIsSingle, + broadcastSetName, + expectedType) + +val (multiRowDataSet, singleRowDataSet) = + if (leftIsSingle) { +(rightDataSet, leftDataSet) + } else { +(leftDataSet, rightDataSet) + } + +multiRowDataSet + .map(mapSideJoin) + .withBroadcastSet(singleRowDataSet, broadcastSetName) + .name(getMapOperatorName)
[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89483366 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin} +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSingleRowCross} + +class DataSetSingleRowCrossRule + extends ConverterRule( + classOf[LogicalJoin], + Convention.NONE, + DataSetConvention.INSTANCE, + "DataSetSingleRowCrossRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join = call.rel(0).asInstanceOf[LogicalJoin] + +if (isCrossJoin(join)) { + isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) || + isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal) +} else { + false +} + } + + private def isCrossJoin(join: LogicalJoin) = { +val joinCondition = join.analyzeCondition +joinCondition.isEqui && joinCondition.pairs().isEmpty + } + + private def isGlobalAggregation(node: RelNode) = { +node.isInstanceOf[LogicalAggregate] && + isSingleLine(node.asInstanceOf[LogicalAggregate]) + } + + private def isSingleLine(agg: LogicalAggregate) = { +agg.getGroupSets.size() == 1 && +agg.getGroupSets.get(0).isEmpty && +agg.getGroupSet.isEmpty + } + + override def convert(rel: RelNode): RelNode = { +val join = rel.asInstanceOf[LogicalJoin] +val traitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) +val dataSetLeftNode = RelOptRule.convert(join.getLeft, DataSetConvention.INSTANCE) +val dataSetRightNode = RelOptRule.convert(join.getRight, DataSetConvention.INSTANCE) +val leftIsSingle = isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal) + +new DataSetSingleRowCross( --- End diff -- Add the join condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89462392 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/RichMapRunner.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.slf4j.LoggerFactory + +class RichMapRunner[IN, OUT]( +name: String, +code: String, +@transient returnType: TypeInformation[OUT]) + extends RichMapFunction[IN, OUT] + with ResultTypeQueryable[OUT] + with FunctionCompiler[RichMapFunction[IN, OUT]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: RichMapFunction[IN, OUT] = null + + override def open(parameters: Configuration): Unit = { --- End diff -- Can you also forward the `RichFunction.close()` call? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693127#comment-15693127 ] ASF GitHub Bot commented on FLINK-4541: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89462392 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/RichMapRunner.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.slf4j.LoggerFactory + +class RichMapRunner[IN, OUT]( +name: String, +code: String, +@transient returnType: TypeInformation[OUT]) + extends RichMapFunction[IN, OUT] + with ResultTypeQueryable[OUT] + with FunctionCompiler[RichMapFunction[IN, OUT]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: RichMapFunction[IN, OUT] = null + + override def open(parameters: Configuration): Unit = { --- End diff -- Can you also forward the `RichFunction.close()` call? > Support for SQL NOT IN operator > --- > > Key: FLINK-4541 > URL: https://issues.apache.org/jira/browse/FLINK-4541 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Alexander Shoshin > > This should work: > {code} > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > // register the DataSet as table "WordCount" > tEnv.registerDataSet("WordCount", input, 'word, 'frequency) > tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, > 'frequency).select('word).filter('word !== "hello")) > // run a SQL query on the Table and retrieve the result as a new Table > val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE > word NOT IN (SELECT word FROM WordCount2) GROUP BY word") > table.toDataSet[WC].print() > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693122#comment-15693122 ] ASF GitHub Bot commented on FLINK-4541: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89461025 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin} +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSingleRowCross} + +class DataSetSingleRowCrossRule + extends ConverterRule( + classOf[LogicalJoin], + Convention.NONE, + DataSetConvention.INSTANCE, + "DataSetSingleRowCrossRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join = call.rel(0).asInstanceOf[LogicalJoin] + +if (isCrossJoin(join)) { + isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) || + isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal) +} else { + false +} + } + + private def isCrossJoin(join: LogicalJoin) = { +val joinCondition = join.analyzeCondition +joinCondition.isEqui && joinCondition.pairs().isEmpty --- End diff -- Actually, I think we can remove the cross join condition completely and also execute equi-joins with a single-row input as Map-Broadcast join. This execution is very lightweight and should outperform sort and hash-based strategies used for equi-joins. We need to evaluate the condition in the MapFunction though. > Support for SQL NOT IN operator > --- > > Key: FLINK-4541 > URL: https://issues.apache.org/jira/browse/FLINK-4541 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Alexander Shoshin > > This should work: > {code} > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > // register the DataSet as table "WordCount" > tEnv.registerDataSet("WordCount", input, 'word, 'frequency) > tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, > 'frequency).select('word).filter('word !== "hello")) > // run a SQL query on the Table and retrieve the result as a new Table > val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE > word NOT IN (SELECT word FROM WordCount2) GROUP BY word") > table.toDataSet[WC].print() > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89480259 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.RichMapRunner +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode which matches along with CrossOperator. + */ +class DataSetSingleRowCross( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinRowType: RelDataType, +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetSingleRowCross( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + leftIsSingle, + getRowType, + joinRowType, + ruleDescription) + } + + override def toString: String = { +s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("where", joinConditionToString) + .item("join", joinSelectionToString) + .item("joinType", joinTypeToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val children = this.getInputs +children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, child) => --- End diff -- I would compute the costs only for the large input. This way the costs are lower than the costs of `DataSetJoin` and the `DataSetSingleRowJoin` is preferred. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89482516 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.RichMapRunner +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode which matches along with CrossOperator. + */ +class DataSetSingleRowCross( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinRowType: RelDataType, +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetSingleRowCross( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + leftIsSingle, + getRowType, + joinRowType, + ruleDescription) + } + + override def toString: String = { +s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("where", joinConditionToString) + .item("join", joinSelectionToString) + .item("joinType", joinTypeToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val children = this.getInputs +children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, child) => + val rowCnt = metadata.getRowCount(child) + val rowSize = this.estimateRowSize(child.getRowType) + cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)) +} + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) +val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) +val broadcastSetName = "joinSet" +val mapSideJoin = generateMapFunction( + tableEnv.getConfig, + leftDataSet.getType, + rightDataSet.getType, + leftIsSingle, + broadcastSetName, + expectedType) + +val (multiRowDataSet, singleRowDataSet) = + if (leftIsSingle) { +(rightDataSet, leftDataSet) + } else { +(leftDataSet, rightDataSet) + } + +multiRowDataSet + .map(mapSideJoin) + .withBroadcastSet(singleRowDataSet, broadcastSetName) + .name(getMapOperatorName) + .asInstanceOf[DataSet[Any]] + } + + private def generateMapFunction( --- End diff -- Actually, I think it is easier and requires less additional code to generate a regular `JoinFunction` as done in the `DataSetJoin` a
[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89483891 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/RichMapRunner.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.slf4j.LoggerFactory + +class RichMapRunner[IN, OUT]( +name: String, +code: String, +@transient returnType: TypeInformation[OUT]) + extends RichMapFunction[IN, OUT] --- End diff -- extend from `RichFlatMapFunction`. `DataSetJoin` uses a `FlatJoinFunction` which emits records via a `Collector`. Wrapping this `FlatJoinFunction` will be easier from a `FlatMapFunction` than from a `MapFunction`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693129#comment-15693129 ] ASF GitHub Bot commented on FLINK-4541: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89480259 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.RichMapRunner +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode which matches along with CrossOperator. + */ +class DataSetSingleRowCross( +cluster: RelOptCluster, +traitSet: RelTraitSet, +leftNode: RelNode, +rightNode: RelNode, +leftIsSingle: Boolean, +rowRelDataType: RelDataType, +joinRowType: RelDataType, +ruleDescription: String) + extends BiRel(cluster, traitSet, leftNode, rightNode) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetSingleRowCross( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + leftIsSingle, + getRowType, + joinRowType, + ruleDescription) + } + + override def toString: String = { +s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .item("where", joinConditionToString) + .item("join", joinSelectionToString) + .item("joinType", joinTypeToString) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val children = this.getInputs +children.foldLeft(planner.getCostFactory.makeZeroCost()) { (cost, child) => --- End diff -- I would compute the costs only for the large input. This way the costs are lower than the costs of `DataSetJoin` and the `DataSetSingleRowJoin` is preferred. > Support for SQL NOT IN operator > --- > > Key: FLINK-4541 > URL: https://issues.apache.org/jira/browse/FLINK-4541 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Alexander Shoshin > > This should work: > {code} > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > // register the DataSet as table "WordCount" > tEnv.registerDataSet("WordCount", input, 'word, 'frequency) > tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, > 'frequency).select('word).filter('word !== "hello")) > // run a SQL query on the Table and retrieve the result as a new Table > va
[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89466031 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin} +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSingleRowCross} + +class DataSetSingleRowCrossRule --- End diff -- Rename to `DataSetSingleRowJoinRule` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4541) Support for SQL NOT IN operator
[ https://issues.apache.org/jira/browse/FLINK-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693130#comment-15693130 ] ASF GitHub Bot commented on FLINK-4541: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89465992 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.RichMapRunner +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode which matches along with CrossOperator. + */ +class DataSetSingleRowCross( --- End diff -- Rename to `DataSetSingleRowJoin` > Support for SQL NOT IN operator > --- > > Key: FLINK-4541 > URL: https://issues.apache.org/jira/browse/FLINK-4541 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Alexander Shoshin > > This should work: > {code} > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > // register the DataSet as table "WordCount" > tEnv.registerDataSet("WordCount", input, 'word, 'frequency) > tEnv.registerTable("WordCount2", tEnv.fromDataSet(input, 'word, > 'frequency).select('word).filter('word !== "hello")) > // run a SQL query on the Table and retrieve the result as a new Table > val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount WHERE > word NOT IN (SELECT word FROM WordCount2) GROUP BY word") > table.toDataSet[WC].print() > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89461025 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSingleRowCrossRule.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalJoin} +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSingleRowCross} + +class DataSetSingleRowCrossRule + extends ConverterRule( + classOf[LogicalJoin], + Convention.NONE, + DataSetConvention.INSTANCE, + "DataSetSingleRowCrossRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join = call.rel(0).asInstanceOf[LogicalJoin] + +if (isCrossJoin(join)) { + isGlobalAggregation(join.getRight.asInstanceOf[RelSubset].getOriginal) || + isGlobalAggregation(join.getLeft.asInstanceOf[RelSubset].getOriginal) +} else { + false +} + } + + private def isCrossJoin(join: LogicalJoin) = { +val joinCondition = join.analyzeCondition +joinCondition.isEqui && joinCondition.pairs().isEmpty --- End diff -- Actually, I think we can remove the cross join condition completely and also execute equi-joins with a single-row input as Map-Broadcast join. This execution is very lightweight and should outperform sort and hash-based strategies used for equi-joins. We need to evaluate the condition in the MapFunction though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2811: [FLINK-4541] Support for SQL NOT IN operator
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2811#discussion_r89465952 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowCross.scala --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.RichMapRunner +import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +/** + * Flink RelNode which matches along with CrossOperator. --- End diff -- Update to `Flink RelNode that executes a Join where one input is a single row.` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-5154) Duplicate TypeSerializer when writing RocksDB Snapshot
[ https://issues.apache.org/jira/browse/FLINK-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-5154. --- Resolution: Not A Problem > Duplicate TypeSerializer when writing RocksDB Snapshot > -- > > Key: FLINK-5154 > URL: https://issues.apache.org/jira/browse/FLINK-5154 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Stefan Richter >Priority: Blocker > Fix For: 1.2.0, 1.1.4 > > > Some {{TypeSerializers}} are not thread safe (for example {{KryoSerializer}}) > we have to {{duplicate()}} them when using concurrently, as happens when > performing a RocksDB snapshot. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5155) Deprecate ValueStateDescriptor constructors with default value
[ https://issues.apache.org/jira/browse/FLINK-5155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-5155: Description: Having the default value in the descriptor is problematic with some serialisers and we don't lose a feature because users can always check for the null value and initialise with their own default value if necessary. Right now, we're always forcing people to specify a default value even though they don't need one. Of course, we should add constructors without a default value. was:Having the default value in the descriptor is problematic with some serialisers and we don't lose a feature because users can always check for the null value and initialise with their own default value if necessary. Right now, we're always forcing people to specify a default value even though they don't need one. > Deprecate ValueStateDescriptor constructors with default value > -- > > Key: FLINK-5155 > URL: https://issues.apache.org/jira/browse/FLINK-5155 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek > > Having the default value in the descriptor is problematic with some > serialisers and we don't lose a feature because users can always check for > the null value and initialise with their own default value if necessary. > Right now, we're always forcing people to specify a default value even though > they don't need one. > Of course, we should add constructors without a default value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5155) Deprecate ValueStateDescriptor constructors with default value
Aljoscha Krettek created FLINK-5155: --- Summary: Deprecate ValueStateDescriptor constructors with default value Key: FLINK-5155 URL: https://issues.apache.org/jira/browse/FLINK-5155 Project: Flink Issue Type: Improvement Components: Streaming Reporter: Aljoscha Krettek Having the default value in the descriptor is problematic with some serialisers and we don't lose a feature because users can always check for the null value and initialise with their own default value if necessary. Right now, we're always forcing people to specify a default value even though they don't need one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2664: [FLINK-4861] [build] Package optional project artifacts
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2664 I thought that transitive dependencies are resolved in the scope of assembly descriptors. But I'm not so sure about that anymore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4861) Package optional project artifacts
[ https://issues.apache.org/jira/browse/FLINK-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693038#comment-15693038 ] ASF GitHub Bot commented on FLINK-4861: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2664 I thought that transitive dependencies are resolved in the scope of assembly descriptors. But I'm not so sure about that anymore. > Package optional project artifacts > -- > > Key: FLINK-4861 > URL: https://issues.apache.org/jira/browse/FLINK-4861 > Project: Flink > Issue Type: New Feature > Components: Build System >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.2.0 > > > Per the mailing list > [discussion|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Additional-project-downloads-td13223.html], > package the Flink libraries and connectors into subdirectories of a new > {{opt}} directory in the release/snapshot tarballs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5154) Duplicate TypeSerializer when writing RocksDB Snapshot
Aljoscha Krettek created FLINK-5154: --- Summary: Duplicate TypeSerializer when writing RocksDB Snapshot Key: FLINK-5154 URL: https://issues.apache.org/jira/browse/FLINK-5154 Project: Flink Issue Type: Bug Components: Streaming Reporter: Aljoscha Krettek Assignee: Stefan Richter Priority: Blocker Fix For: 1.2.0, 1.1.4 Some {{TypeSerializers}} are not thread safe (for example {{KryoSerializer}}) we have to {{duplicate()}} them when using concurrently, as happens when performing a RocksDB snapshot. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.
[ https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693016#comment-15693016 ] ASF GitHub Bot commented on FLINK-5096: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2845 Thanks for the review @zentol ! I integrated your comments and rebased to the master. Let's see what travis has to say and then merge it if you have no further comments ;) > Make the RollingSink rescalable. > > > Key: FLINK-5096 > URL: https://issues.apache.org/jira/browse/FLINK-5096 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Integrate the RollingSink with the new state abstractions so that its > parallelism can change after restoring from a savepoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2845: [FLINK-5096] Make the RollingSink rescalable.
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2845 Thanks for the review @zentol ! I integrated your comments and rebased to the master. Let's see what travis has to say and then merge it if you have no further comments ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15693008#comment-15693008 ] Fabian Hueske commented on FLINK-4565: -- Great! Looking forward to it. > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Nikolay Vasilishin > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2856: Removed excessive tests.
Github user chermenin commented on the issue: https://github.com/apache/flink/pull/2856 I removed a method from ITCase only. There is `testJavaArraysAsList` method in `KryoCollectionsSerializerTest` class to check it. Yep, before merging #2623, they have not been successful. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5075) Kinesis consumer incorrectly determines shards as newly discovered when tested against Kinesalite
[ https://issues.apache.org/jira/browse/FLINK-5075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15692971#comment-15692971 ] Tzu-Li (Gordon) Tai commented on FLINK-5075: Resolved for {{master}} via http://git-wip-us.apache.org/repos/asf/flink/commit/f5f4f7a Resolved for {{release-1.1}} via http://git-wip-us.apache.org/repos/asf/flink/commit/b9e6dcc > Kinesis consumer incorrectly determines shards as newly discovered when > tested against Kinesalite > - > > Key: FLINK-5075 > URL: https://issues.apache.org/jira/browse/FLINK-5075 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0, 1.1.4 > > > A user reported that when our Kinesis connector is used against Kinesalite > (https://github.com/mhart/kinesalite), we're incorrectly determining already > found shards as newly discovered: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Subtask-keeps-on-discovering-new-Kinesis-shard-when-using-Kinesalite-td10133.html > I suspect the problem to be the mock Kinesis API implementations of > Kinesalite doesn't completely match with the official AWS Kinesis behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5075) Kinesis consumer incorrectly determines shards as newly discovered when tested against Kinesalite
[ https://issues.apache.org/jira/browse/FLINK-5075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-5075: --- Fix Version/s: 1.1.4 > Kinesis consumer incorrectly determines shards as newly discovered when > tested against Kinesalite > - > > Key: FLINK-5075 > URL: https://issues.apache.org/jira/browse/FLINK-5075 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0, 1.1.4 > > > A user reported that when our Kinesis connector is used against Kinesalite > (https://github.com/mhart/kinesalite), we're incorrectly determining already > found shards as newly discovered: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Subtask-keeps-on-discovering-new-Kinesis-shard-when-using-Kinesalite-td10133.html > I suspect the problem to be the mock Kinesis API implementations of > Kinesalite doesn't completely match with the official AWS Kinesis behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-5075) Kinesis consumer incorrectly determines shards as newly discovered when tested against Kinesalite
[ https://issues.apache.org/jira/browse/FLINK-5075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-5075. Resolution: Fixed > Kinesis consumer incorrectly determines shards as newly discovered when > tested against Kinesalite > - > > Key: FLINK-5075 > URL: https://issues.apache.org/jira/browse/FLINK-5075 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0, 1.1.4 > > > A user reported that when our Kinesis connector is used against Kinesalite > (https://github.com/mhart/kinesalite), we're incorrectly determining already > found shards as newly discovered: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Subtask-keeps-on-discovering-new-Kinesis-shard-when-using-Kinesalite-td10133.html > I suspect the problem to be the mock Kinesis API implementations of > Kinesalite doesn't completely match with the official AWS Kinesis behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5075) Kinesis consumer incorrectly determines shards as newly discovered when tested against Kinesalite
[ https://issues.apache.org/jira/browse/FLINK-5075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-5075: --- Fix Version/s: 1.2.0 > Kinesis consumer incorrectly determines shards as newly discovered when > tested against Kinesalite > - > > Key: FLINK-5075 > URL: https://issues.apache.org/jira/browse/FLINK-5075 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0, 1.1.4 > > > A user reported that when our Kinesis connector is used against Kinesalite > (https://github.com/mhart/kinesalite), we're incorrectly determining already > found shards as newly discovered: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Subtask-keeps-on-discovering-new-Kinesis-shard-when-using-Kinesalite-td10133.html > I suspect the problem to be the mock Kinesis API implementations of > Kinesalite doesn't completely match with the official AWS Kinesis behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332)