[jira] [Commented] (FLINK-5075) Kinesis consumer incorrectly determines shards as newly discovered when tested against Kinesalite
[ https://issues.apache.org/jira/browse/FLINK-5075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15673053#comment-15673053 ] ASF GitHub Bot commented on FLINK-5075: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2822 I'm not entirely sure whether the fix is a good workaround, or whether we should really bother with Kinesalite's incorrect behaviour at all in our Kinesis connector. I've considered the alternative is to just use the `describeStream(streamName)` variant, always fetching the whole shard list and explicitly ruling out shards we've already seen. That'll make the code clean of such "workarounds", but will be problematic for Kinesis users whose shard count exceeds the largest number of shards the API can return. > Kinesis consumer incorrectly determines shards as newly discovered when > tested against Kinesalite > - > > Key: FLINK-5075 > URL: https://issues.apache.org/jira/browse/FLINK-5075 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > A user reported that when our Kinesis connector is used against Kinesalite > (https://github.com/mhart/kinesalite), we're incorrectly determining already > found shards as newly discovered: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Subtask-keeps-on-discovering-new-Kinesis-shard-when-using-Kinesalite-td10133.html > I suspect the problem to be the mock Kinesis API implementations of > Kinesalite doesn't completely match with the official AWS Kinesis behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2822: [FLINK-5075] [kinesis] Make Kinesis consumer fail-proof t...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2822 I'm not entirely sure whether the fix is a good workaround, or whether we should really bother with Kinesalite's incorrect behaviour at all in our Kinesis connector. I've considered the alternative is to just use the `describeStream(streamName)` variant, always fetching the whole shard list and explicitly ruling out shards we've already seen. That'll make the code clean of such "workarounds", but will be problematic for Kinesis users whose shard count exceeds the largest number of shards the API can return. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5075) Kinesis consumer incorrectly determines shards as newly discovered when tested against Kinesalite
[ https://issues.apache.org/jira/browse/FLINK-5075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15673047#comment-15673047 ] ASF GitHub Bot commented on FLINK-5075: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/2822 [FLINK-5075] [kinesis] Make Kinesis consumer fail-proof to incorrect Kinesalite API behaviour A user reported that when tested against Kinesalite (a widely used mock Kinesis implementation), the connector was incorrectly determining already seen shards as newly discovered. The problem was that the connector was using the API `describeStream(streamName, exclusiveStartIShardId)` to fetch shards of a stream after the provided `exclusiveStartShardId` (given as the last id of the latest shard we've already discovered), and Kinesalite behaves differently for this from the official Kinesis API. For example, if the current complete shard list is [shard-0, shard-1, shard-2, shard-3] for "test-stream", then `describeStream("test-stream", "shard-1")` should return: [shard-2, shard-3]. Kinesalite, however, incorrectly returns the whole list. I've manually tested this change against Kinesalite, and shard discovery is working normally again. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-5075 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2822.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2822 commit cbef7eabeae645a1cd3533d3274cdb7491b1a779 Author: Tzu-Li (Gordon) Tai Date: 2016-11-17T06:24:24Z [FLINK-5075] [kinesis] Make connector fail-proof to incorrect Kinesalite API behaviour > Kinesis consumer incorrectly determines shards as newly discovered when > tested against Kinesalite > - > > Key: FLINK-5075 > URL: https://issues.apache.org/jira/browse/FLINK-5075 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > A user reported that when our Kinesis connector is used against Kinesalite > (https://github.com/mhart/kinesalite), we're incorrectly determining already > found shards as newly discovered: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Subtask-keeps-on-discovering-new-Kinesis-shard-when-using-Kinesalite-td10133.html > I suspect the problem to be the mock Kinesis API implementations of > Kinesalite doesn't completely match with the official AWS Kinesis behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2822: [FLINK-5075] [kinesis] Make Kinesis consumer fail-...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/2822 [FLINK-5075] [kinesis] Make Kinesis consumer fail-proof to incorrect Kinesalite API behaviour A user reported that when tested against Kinesalite (a widely used mock Kinesis implementation), the connector was incorrectly determining already seen shards as newly discovered. The problem was that the connector was using the API `describeStream(streamName, exclusiveStartIShardId)` to fetch shards of a stream after the provided `exclusiveStartShardId` (given as the last id of the latest shard we've already discovered), and Kinesalite behaves differently for this from the official Kinesis API. For example, if the current complete shard list is [shard-0, shard-1, shard-2, shard-3] for "test-stream", then `describeStream("test-stream", "shard-1")` should return: [shard-2, shard-3]. Kinesalite, however, incorrectly returns the whole list. I've manually tested this change against Kinesalite, and shard discovery is working normally again. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-5075 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2822.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2822 commit cbef7eabeae645a1cd3533d3274cdb7491b1a779 Author: Tzu-Li (Gordon) Tai Date: 2016-11-17T06:24:24Z [FLINK-5075] [kinesis] Make connector fail-proof to incorrect Kinesalite API behaviour --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2646) Rich functions should provide a method "closeAfterFailure()"
[ https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15673034#comment-15673034 ] Kostas Kloudas commented on FLINK-2646: --- Yes I agree. This is what I also meant in my comment above. > Rich functions should provide a method "closeAfterFailure()" > > > Key: FLINK-2646 > URL: https://issues.apache.org/jira/browse/FLINK-2646 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Liang Chen > Fix For: 1.0.0 > > > Right now, the {{close()}} method of rich functions is invoked in case of > proper completion, and in case of canceling in case of error (to allow for > cleanup). > In certain cases, the user function needs to know why it is closed, whether > the task completed in a regular fashion, or was canceled/failed. > I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By > default, this method calls {{close()}}. The runtime is the changed to call > {{close()}} as part of the regular execution and {{closeAfterFailure()}} in > case of an irregular exit. > Because by default all cases call {{close()}} the change would not be API > breaking. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2646) Rich functions should provide a method "closeAfterFailure()"
[ https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15673030#comment-15673030 ] Kostas Kloudas commented on FLINK-2646: --- Yes, I think we are on the same page. This is what I also meant in my comment above. > Rich functions should provide a method "closeAfterFailure()" > > > Key: FLINK-2646 > URL: https://issues.apache.org/jira/browse/FLINK-2646 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Liang Chen > Fix For: 1.0.0 > > > Right now, the {{close()}} method of rich functions is invoked in case of > proper completion, and in case of canceling in case of error (to allow for > cleanup). > In certain cases, the user function needs to know why it is closed, whether > the task completed in a regular fashion, or was canceled/failed. > I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By > default, this method calls {{close()}}. The runtime is the changed to call > {{close()}} as part of the regular execution and {{closeAfterFailure()}} in > case of an irregular exit. > Because by default all cases call {{close()}} the change would not be API > breaking. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (FLINK-2646) Rich functions should provide a method "closeAfterFailure()"
[ https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas updated FLINK-2646: -- Comment: was deleted (was: Yes, I think we are on the same page. This is what I also meant in my comment above.) > Rich functions should provide a method "closeAfterFailure()" > > > Key: FLINK-2646 > URL: https://issues.apache.org/jira/browse/FLINK-2646 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Liang Chen > Fix For: 1.0.0 > > > Right now, the {{close()}} method of rich functions is invoked in case of > proper completion, and in case of canceling in case of error (to allow for > cleanup). > In certain cases, the user function needs to know why it is closed, whether > the task completed in a regular fashion, or was canceled/failed. > I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By > default, this method calls {{close()}}. The runtime is the changed to call > {{close()}} as part of the regular execution and {{closeAfterFailure()}} in > case of an irregular exit. > Because by default all cases call {{close()}} the change would not be API > breaking. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5085) Execute CheckpointCoodinator's state discard calls asynchronously
[ https://issues.apache.org/jira/browse/FLINK-5085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15673026#comment-15673026 ] Xiaogang Shi commented on FLINK-5085: - Great, this is what i thought of in recent days. Our states are composed of thousands of files on HDFS. It takes a long time to delete them in sequence. A dedicated executor will help improve the performance. > Execute CheckpointCoodinator's state discard calls asynchronously > - > > Key: FLINK-5085 > URL: https://issues.apache.org/jira/browse/FLINK-5085 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0, 1.1.4 > > > The {{CheckpointCoordinator}} discards under certain circumstances pending > checkpoints or state handles. These discard operations can involve a blocking > IO operation if the underlying state handle refers to a file which has to be > deleted. In order to not block the calling thread, we should execute these > calls in a dedicated IO executor. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2821: [MINOR][DOC] Fix incorrect URL
GitHub user mindprince opened a pull request: https://github.com/apache/flink/pull/2821 [MINOR][DOC] Fix incorrect URL You can merge this pull request into a Git repository by running: $ git pull https://github.com/mindprince/flink patch-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2821.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2821 commit 26cd26d7e81960f7ada770a069984ed3a3e76248 Author: Rohit Agarwal Date: 2016-11-17T00:49:57Z [MINOR][DOC] Fix incorrect URL --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5085) Execute CheckpointCoodinator's state discard calls asynchronously
Till Rohrmann created FLINK-5085: Summary: Execute CheckpointCoodinator's state discard calls asynchronously Key: FLINK-5085 URL: https://issues.apache.org/jira/browse/FLINK-5085 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.1.3, 1.2.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.2.0, 1.1.4 The {{CheckpointCoordinator}} discards under certain circumstances pending checkpoints or state handles. These discard operations can involve a blocking IO operation if the underlying state handle refers to a file which has to be deleted. In order to not block the calling thread, we should execute these calls in a dedicated IO executor. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5063) State handles are not properly cleaned up for declined or expired checkpoints
[ https://issues.apache.org/jira/browse/FLINK-5063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5063. --- > State handles are not properly cleaned up for declined or expired checkpoints > - > > Key: FLINK-5063 > URL: https://issues.apache.org/jira/browse/FLINK-5063 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.2.0, 1.1.4 > > > In case that a {{Checkpoint}} is declined or expires, the > {{CheckpointCoordinator}} will dispose the {{PendingCheckpoint}}. Disposing > the {{PendingCheckpoint}} entails that all so far registered > {{SubtaskStates}} of the acknowledged {{Tasks}} are discarded. However, all > late arriving acknowledge messages are simply ignored without properly > discarding the transmitted state handles. This can lead to a cluttering of > checkpoint directory since the checkpoint files of late or unknown > acknowledge checkpoint messages are never deleted. > I propose to properly discard the state handles at the > {{CheckpointCoordinator}} if receiving a late acknowledge message or an > acknowledge message for an unknown {{ExecutionAttemptID}} belonging to the > job of the {{CheckpointCoordinator}}. However, checkpoint messages belonging > to a different job won't be handled and simply ignored. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-5063) State handles are not properly cleaned up for declined or expired checkpoints
[ https://issues.apache.org/jira/browse/FLINK-5063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5063. - Resolution: Fixed Fixed in - 1.1.4 via 4daf3bbc1e0251e1e84d799421dae9e3fa2363fc - 1.2.0 via 72b295b3b52dff2d0bc5b78881826e8936c370ff > State handles are not properly cleaned up for declined or expired checkpoints > - > > Key: FLINK-5063 > URL: https://issues.apache.org/jira/browse/FLINK-5063 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.2.0, 1.1.4 > > > In case that a {{Checkpoint}} is declined or expires, the > {{CheckpointCoordinator}} will dispose the {{PendingCheckpoint}}. Disposing > the {{PendingCheckpoint}} entails that all so far registered > {{SubtaskStates}} of the acknowledged {{Tasks}} are discarded. However, all > late arriving acknowledge messages are simply ignored without properly > discarding the transmitted state handles. This can lead to a cluttering of > checkpoint directory since the checkpoint files of late or unknown > acknowledge checkpoint messages are never deleted. > I propose to properly discard the state handles at the > {{CheckpointCoordinator}} if receiving a late acknowledge message or an > acknowledge message for an unknown {{ExecutionAttemptID}} belonging to the > job of the {{CheckpointCoordinator}}. However, checkpoint messages belonging > to a different job won't be handled and simply ignored. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-5057) Cancellation timeouts are picked from wrong config
[ https://issues.apache.org/jira/browse/FLINK-5057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5057. - Resolution: Fixed Fixed in - 1.1.4 via ad3e674b26fb5766ffefda653701af5180d60413 - 1.2.0 via bf06a1cc786c0a7b8c8d446be01a63edf2cc0897 > Cancellation timeouts are picked from wrong config > -- > > Key: FLINK-5057 > URL: https://issues.apache.org/jira/browse/FLINK-5057 > Project: Flink > Issue Type: Bug >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.2.0, 1.1.4 > > > The cancellation timeouts are read from the Task configuration instead of the > TaskManager configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-5057) Cancellation timeouts are picked from wrong config
[ https://issues.apache.org/jira/browse/FLINK-5057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5057. --- > Cancellation timeouts are picked from wrong config > -- > > Key: FLINK-5057 > URL: https://issues.apache.org/jira/browse/FLINK-5057 > Project: Flink > Issue Type: Bug >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.2.0, 1.1.4 > > > The cancellation timeouts are read from the Task configuration instead of the > TaskManager configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2812: [FLINK-5063] Discard state handles of declined or ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2812 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2793: [FLINK-5057] [task] Pick cancellation timeout from...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2793 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5063) State handles are not properly cleaned up for declined or expired checkpoints
[ https://issues.apache.org/jira/browse/FLINK-5063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15672074#comment-15672074 ] ASF GitHub Bot commented on FLINK-5063: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2812 > State handles are not properly cleaned up for declined or expired checkpoints > - > > Key: FLINK-5063 > URL: https://issues.apache.org/jira/browse/FLINK-5063 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0, 1.1.3 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.2.0, 1.1.4 > > > In case that a {{Checkpoint}} is declined or expires, the > {{CheckpointCoordinator}} will dispose the {{PendingCheckpoint}}. Disposing > the {{PendingCheckpoint}} entails that all so far registered > {{SubtaskStates}} of the acknowledged {{Tasks}} are discarded. However, all > late arriving acknowledge messages are simply ignored without properly > discarding the transmitted state handles. This can lead to a cluttering of > checkpoint directory since the checkpoint files of late or unknown > acknowledge checkpoint messages are never deleted. > I propose to properly discard the state handles at the > {{CheckpointCoordinator}} if receiving a late acknowledge message or an > acknowledge message for an unknown {{ExecutionAttemptID}} belonging to the > job of the {{CheckpointCoordinator}}. However, checkpoint messages belonging > to a different job won't be handled and simply ignored. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5006) SystemProcessingTimeServiceTest.testTimerSorting fails
[ https://issues.apache.org/jira/browse/FLINK-5006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15672075#comment-15672075 ] ASF GitHub Bot commented on FLINK-5006: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2785 > SystemProcessingTimeServiceTest.testTimerSorting fails > -- > > Key: FLINK-5006 > URL: https://issues.apache.org/jira/browse/FLINK-5006 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Ufuk Celebi > Labels: test-stability > > {code} > testTimerSorting(org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest) > Time elapsed: 0.023 sec <<< FAILURE! > java.lang.AssertionError: expected:<1478173518115> but was:<1478173518122> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at org.junit.Assert.assertEquals(Assert.java:631) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.testTimerSorting(SystemProcessingTimeServiceTest.java:298) > {code} > Failed in a private branch with unrelated changes (the test is very much self > contained). > https://s3.amazonaws.com/archive.travis-ci.org/jobs/172910645/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2785: [FLINK-5006] SystemProcessingTimeServiceTest.testT...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2785 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5057) Cancellation timeouts are picked from wrong config
[ https://issues.apache.org/jira/browse/FLINK-5057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15672073#comment-15672073 ] ASF GitHub Bot commented on FLINK-5057: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2793 > Cancellation timeouts are picked from wrong config > -- > > Key: FLINK-5057 > URL: https://issues.apache.org/jira/browse/FLINK-5057 > Project: Flink > Issue Type: Bug >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.2.0, 1.1.4 > > > The cancellation timeouts are read from the Task configuration instead of the > TaskManager configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5084) Replace Java Table API integration tests by unit tests
Fabian Hueske created FLINK-5084: Summary: Replace Java Table API integration tests by unit tests Key: FLINK-5084 URL: https://issues.apache.org/jira/browse/FLINK-5084 Project: Flink Issue Type: Task Components: Table API & SQL Reporter: Fabian Hueske Priority: Minor The Java Table API is a wrapper on top of the Scala Table API. Instead of operating directly with Expressions like the Scala API, the Java API accepts a String parameter which is parsed into Expressions. We could therefore replace the Java Table API ITCases by tests that check that the parsing step produces a valid logical plan. This could be done by creating two {{Table}} objects for an identical query once with the Scala Expression API and one with the Java String API and comparing the logical plans of both {{Table}} objects. Basically something like the following: {code} val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) val joinT1 = ds1.join(ds2).where('b === 'e).select('c, 'g) val joinT2 = ds1.join(ds2).where("b = e").select("c, g") val lPlan1 = joinT1.logicalPlan val lPlan2 = joinT2.logicalPlan Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88342301 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.rules.datastream + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan} +import org.apache.calcite.rex.RexNode +import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamCorrelate, DataStreamConvention} + +/** + * parser cross/outer apply + */ +class DataStreamCorrelateRule + extends ConverterRule( +classOf[LogicalCorrelate], +Convention.NONE, +DataStreamConvention.INSTANCE, +"DataStreamCorrelateRule") +{ + + override def matches(call: RelOptRuleCall): Boolean = { +val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate] +val right = join.getRight.asInstanceOf[RelSubset].getOriginal + +right match { + // right node is a table function + case scan: LogicalTableFunctionScan => true + // a filter is pushed above the table function + case filter: LogicalFilter => +filter.getInput.asInstanceOf[RelSubset].getOriginal + .isInstanceOf[LogicalTableFunctionScan] + case _ => false +} + } + + override def convert(rel: RelNode): RelNode = { +val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate] +val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) +val convInput: RelNode = RelOptRule.convert(join.getInput(0), DataStreamConvention.INSTANCE) +val right: RelNode = join.getInput(1) + +def convertToCorrelate(relNode: RelNode, condition: RexNode): DataStreamCorrelate = { --- End diff -- define `condition` as `Option[RexNode]` so we do not have to use `null` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88339990 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rex.{RexNode, RexCall} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression, GeneratedFunction} +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.runtime.FlatMapRunner +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.table.typeutils.TypeConverter._ +import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig} + +import scala.collection.JavaConversions._ + +/** + * cross/outer apply a user-defined table function + */ +trait FlinkCorrelate { + + private[flink] def functionBody(generator: CodeGenerator, + udtfTypeInfo: TypeInformation[Any], + rowType: RelDataType, + rexCall: RexCall, + condition: RexNode, + config: TableConfig, + joinType: SemiJoinType, + expectedType: Option[TypeInformation[Any]]): String = { + +val returnType = determineReturnType( + rowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + +val (input1AccessExprs, input2AccessExprs) = generator.generateCorrelateAccessExprs +val crossResultExpr = generator.generateResultExpression(input1AccessExprs ++ input2AccessExprs, + returnType, rowType.getFieldNames) + +val input2NullExprs = input2AccessExprs.map( + x => GeneratedExpression("null", "true", "", x.resultType)) +val outerResultExpr = generator.generateResultExpression(input1AccessExprs ++ input2NullExprs, + returnType, rowType.getFieldNames) + +val call = generator.generateExpression(rexCall) +var body = call.code + + s""" + |scala.collection.Iterator iter = ${call.resultTerm}.getRowsIterator(); +""".stripMargin +if (joinType == SemiJoinType.INNER) { + // cross apply + body += +s""" + |if (iter.isEmpty()) { + | return; + |} +""".stripMargin +} else { --- End diff -- I think it would be safer to add an `else if (joinType == SemiJoinType.LEFT)` here and throw an exception in `else`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671952#comment-15671952 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88345175 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rex.{RexCall, RexNode} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.StreamTableEnvironment +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.plan.nodes.FlinkCorrelate +import org.apache.flink.api.table.typeutils.TypeConverter._ +import org.apache.flink.streaming.api.datastream.DataStream + +/** + * Flink RelNode which matches along with cross apply a user defined table function. + */ +class DataStreamCorrelate( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +scan: LogicalTableFunctionScan, +condition: RexNode, +relRowType: RelDataType, +joinRowType: RelDataType, +joinType: SemiJoinType, +ruleDescription: String) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkCorrelate + with DataStreamRel { + override def deriveRowType() = relRowType + + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val rowCnt = metadata.getRowCount(getInput) + 10 +planner.getCostFactory.makeCost(rowCnt, rowCnt, 0) + } + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamCorrelate( + cluster, + traitSet, + inputs.get(0), + scan, + condition, + relRowType, + joinRowType, + joinType, + ruleDescription) + } + + override def toString: String = { +val funcRel = unwrap(scan) +val rexCall = funcRel.getCall.asInstanceOf[RexCall] +val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] +correlateToString(rexCall, sqlFunction) + } + + override def explainTerms(pw: RelWriter): RelWriter = { +val funcRel = unwrap(scan) +val rexCall = funcRel.getCall.asInstanceOf[RexCall] +val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] +super.explainTerms(pw) + .item("lateral", correlateToString(rexCall, sqlFunction)) + .item("select", selectToString(relRowType)) + } + + + override def translateToPlan(tableEnv: StreamTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { + +val config = tableEnv.getConfig +val returnType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + +val inputDS = inputNode.asInstanceOf[DataStreamRel] + .translateToPlan(tableEnv, Some(inputRowType(inputNode))) --- End diff -- I think we can replace `Some(inputRowType(inputNode))` by `Some(TypeConver
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671927#comment-15671927 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88336697 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -162,24 +191,107 @@ object UserDefinedFunctionUtils { } /** +* Internal method of [[ScalarFunction#getResultType()]] that does some pre-checking and uses --- End diff -- If this method is also used for `TableFunction`, the docs should be adapted. > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88348205 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan} +import org.apache.calcite.rex.RexNode +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetCorrelate} + +/** + * parser cross/outer apply --- End diff -- Replace by "Rule to convert a LogicalCorrelate into a DataSetCorrelate.". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88337239 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/call.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import java.lang.reflect.Method + +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.{FlinkTypeFactory, TableEnvironment, TableException, UnresolvedException} +import org.apache.flink.api.table.expressions.{Attribute, Expression, ResolvedFieldReference, UnresolvedFieldReference} +import org.apache.flink.api.table.functions.TableFunction +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils._ +import org.apache.flink.api.table.plan.schema.FlinkTableFunctionImpl +import org.apache.flink.api.table.validate.ValidationFailure + +import scala.collection.JavaConversions._ + +/** + * General expression for unresolved user-defined table function calls. + */ +case class UnresolvedTableFunctionCall(functionName: String, args: Seq[Expression]) + extends LogicalNode { + + override def output: Seq[Attribute] = +throw UnresolvedException("Invalid call to output on UnresolvedTableFunctionCall") + + override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = +throw UnresolvedException("Invalid call to construct on UnresolvedTableFunctionCall") + + override private[flink] def children: Seq[LogicalNode] = +throw UnresolvedException("Invalid call to children on UnresolvedTableFunctionCall") +} + +/** + * LogicalNode for calling a user-defined table functions. + * @param tableFunction table function to be called (might be overloaded) + * @param parameters actual parameters + * @param alias output fields renaming + * @tparam T type of returned table + */ +case class TableFunctionCall[T: TypeInformation]( + tableFunction: TableFunction[T], + parameters: Seq[Expression], + alias: Option[Array[String]]) extends UnaryNode { + + private var table: LogicalNode = _ + override def child: LogicalNode = table + + def setChild(child: LogicalNode): TableFunctionCall[T] = { +table = child +this + } + + private val resultType: TypeInformation[T] = +if (tableFunction.getResultType == null) { + implicitly[TypeInformation[T]] +} else { + tableFunction.getResultType +} + + private val fieldNames: Array[String] = +if (alias.isEmpty) { + getFieldAttribute[T](resultType)._1 +} else { + alias.get +} + private val fieldTypes: Array[TypeInformation[_]] = getFieldAttribute[T](resultType)._2 + + /** +* Assigns an alias for this table function returned fields that the following `select()` clause +* can refer to. +* +* @param aliasList alias for this window +* @return this table function +*/ + def as(aliasList: Expression*): TableFunctionCall[T] = { +if (aliasList == null) { + return this +} +if (aliasList.length != fieldNames.length) { + failValidation("Aliasing not match number of fields") +} else if (!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) { + failValidation("Alias only accept name expressions as arguments") +} else { + val names = aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name).toArray + TableFunctionCall(tableFunction, parameters, Some(names)) +} + } + + override def output: Seq[Attribute] = fieldNames.zip(fieldTypes).map { +case (n, t) =>
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671932#comment-15671932 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88335748 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.functions + +import org.apache.calcite.sql.SqlFunction +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.functions.InvalidTypesException +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory} + +import scala.collection.mutable.ListBuffer + +/** + * Base class for a user-defined table function (UDTF). A user-defined table functions works on + * one row as input and returns multiple rows as output. + * + * The behavior of a [[TableFunction]] can be defined by implementing a custom evaluation + * method. An evaluation method must be declared publicly and named "eval". Evaluation methods + * can also be overloaded by implementing multiple methods named "eval". + * + * User-defined functions must have a default constructor and must be instantiable during runtime. + * + * By default the result type of an evaluation method is determined by Flink's type extraction + * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more + * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type + * can be manually defined by overriding [[getResultType()]]. + * + * Internally, the Table/SQL API code generation works with primitive values as much as possible. + * If a user-defined table function should not introduce much overhead during runtime, it is + * recommended to declare parameters and result types as primitive types instead of their boxed + * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long. + * + * @tparam T The type of the output row + */ +abstract class TableFunction[T] extends UserDefinedFunction with EvaluableFunction { + + private val rows: ListBuffer[T] = new ListBuffer + + /** +* Emit an output row +* +* @param row the output row +*/ + protected def collect(row: T): Unit = { +// cache rows for now, maybe immediately process them further +rows += row + } + + + @Internal --- End diff -- Remove `@Internal` annotation. Annotations are only used in specific Maven modules (flink-core, flink-java, flink-scala, ...) but not yet in flink-table > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from t
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88339350 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rex.{RexNode, RexCall} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression, GeneratedFunction} +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.runtime.FlatMapRunner +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.table.typeutils.TypeConverter._ +import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig} + +import scala.collection.JavaConversions._ + +/** + * cross/outer apply a user-defined table function + */ +trait FlinkCorrelate { + + private[flink] def functionBody(generator: CodeGenerator, --- End diff -- can you change the constructor parameter wrapping and indention to be similar to `DataSetCorrelate`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88336779 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -162,24 +191,107 @@ object UserDefinedFunctionUtils { } /** +* Internal method of [[ScalarFunction#getResultType()]] that does some pre-checking and uses +* [[TypeExtractor]] as default return type inference. +*/ + def getResultType( +tableFunction: TableFunction[_], +signature: Array[Class[_]]) + : TypeInformation[_] = { +// find method for signature +val evalMethod = tableFunction.getEvalMethods + .find(m => signature.sameElements(m.getParameterTypes)) + .getOrElse(throw new ValidationException("Given signature is invalid.")) + +val userDefinedTypeInfo = tableFunction.getResultType +if (userDefinedTypeInfo != null) { + userDefinedTypeInfo +} else { + try { +TypeExtractor.getForClass(evalMethod.getReturnType) + } catch { +case ite: InvalidTypesException => + throw new ValidationException( +s"Return type of table function '$this' cannot be " + + s"automatically determined. Please provide type information manually.") + } +} + } + + /** * Returns the return type of the evaluation method matching the given signature. */ def getResultTypeClass( - scalarFunction: ScalarFunction, + function: EvaluableFunction, --- End diff -- I think `UserDefinedFunction` would be better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671947#comment-15671947 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88343933 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rex.{RexNode, RexCall} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.plan.nodes.FlinkCorrelate +import org.apache.flink.api.table.typeutils.TypeConverter._ + +/** + * Flink RelNode which matches along with cross apply a user defined table function. + */ +class DataSetCorrelate( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +scan: LogicalTableFunctionScan, +condition: RexNode, +relRowType: RelDataType, +joinRowType: RelDataType, +joinType: SemiJoinType, +ruleDescription: String) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkCorrelate + with DataSetRel { + override def deriveRowType() = relRowType + + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val rowCnt = metadata.getRowCount(getInput) + 10 --- End diff -- Not that is would actually matter at the moment, but why are you adding a constant here. Shouldn't it be something like `* 1.5` instead? > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFun
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88338123 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -611,6 +612,130 @@ class Table( } /** +* The Cross Apply returns rows form the outer table (table on the left of the Apply operator) +* that produces matching values from the table-valued function (which is on the right side of +* the operator). +* +* The Cross Apply is equivalent to Inner Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* class MySplitUDTF extends TableFunction[String] { +* def eval(str: String): Unit = { +* str.split("#").foreach(collect) +* } +* } +* +* val split = new MySplitUDTF() +* table.crossApply(split('c).as('s)).select('a,'b,'c,'s) +* }}} +*/ + def crossApply(udtf: TableFunctionCall[_]): Table = { +applyInternal(udtf, JoinType.INNER) + } + + /** +* The Cross Apply returns rows form the outer table (table on the left of the Apply operator) +* that produces matching values from the table-valued function (which is on the right side of +* the operator). +* +* The Cross Apply is equivalent to Inner Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* class MySplitUDTF extends TableFunction[String] { +* def eval(str: String): Unit = { +* str.split("#").foreach(collect) +* } +* } +* +* val split = new MySplitUDTF() +* table.crossApply("split('c') as (s)").select("a, b, c, s") +* }}} +*/ + def crossApply(udtf: String): Table = { +applyInternal(udtf, JoinType.INNER) + } + + /** +* The Cross Apply returns rows form the outer table (table on the left of the Apply operator) --- End diff -- `Cross Apply` should be `Outer Apply`. Please check the complete docs for this method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88347146 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/EvaluableFunction.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.functions + +import java.lang.reflect.{Modifier, Method} +import org.apache.flink.api.table.ValidationException + +/** + * User-defined function has eval methods can extend this trait to reuse the same logic, such as: + * [[ScalarFunction]] and [[TableFunction]]. + */ +trait EvaluableFunction { + + private lazy val evalMethods = checkAndExtractEvalMethods() + private lazy val signatures = evalMethods.map(_.getParameterTypes) + + /** +* Extracts evaluation methods and throws a [[ValidationException]] if no implementation +* can be found. +*/ + private def checkAndExtractEvalMethods(): Array[Method] = { +val methods = getClass + .getDeclaredMethods + .filter { m => +val modifiers = m.getModifiers +m.getName == "eval" && Modifier.isPublic(modifiers) && !Modifier.isAbstract(modifiers) + } + +if (methods.isEmpty) { + throw new ValidationException(s"Table function class '$this' does not implement at least " + --- End diff -- If this method is also used for `ScalarFunction`, the exception message should be adapted accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88345307 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rex.{RexNode, RexCall} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.plan.nodes.FlinkCorrelate +import org.apache.flink.api.table.typeutils.TypeConverter._ + +/** + * Flink RelNode which matches along with cross apply a user defined table function. + */ +class DataSetCorrelate( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +scan: LogicalTableFunctionScan, +condition: RexNode, +relRowType: RelDataType, +joinRowType: RelDataType, +joinType: SemiJoinType, +ruleDescription: String) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkCorrelate + with DataSetRel { + override def deriveRowType() = relRowType + + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val rowCnt = metadata.getRowCount(getInput) + 10 +planner.getCostFactory.makeCost(rowCnt, rowCnt, 0) + } + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetCorrelate( + cluster, + traitSet, + inputs.get(0), + scan, + condition, + relRowType, + joinRowType, + joinType, + ruleDescription) + } + + override def toString: String = { +val rexCall = scan.getCall.asInstanceOf[RexCall] +val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] +correlateToString(rexCall, sqlFunction) + } + + override def explainTerms(pw: RelWriter): RelWriter = { +val rexCall = scan.getCall.asInstanceOf[RexCall] +val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] +super.explainTerms(pw) + .item("lateral", correlateToString(rexCall, sqlFunction)) + .item("select", selectToString(relRowType)) + } + + + override def translateToPlan(tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +val config = tableEnv.getConfig +val returnType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + +val inputDS = inputNode.asInstanceOf[DataSetRel] + .translateToPlan(tableEnv, Some(inputRowType(inputNode))) --- End diff -- I think we can replace `Some(inputRowType(inputNode))` by `Some(TypeConverter.DEFAULT_ROW_TYPE)` (similar as in `DataSetAggregate.translateToPlan()` Then we can also remove the method `FlinkCorrelate.inputRowType`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, pl
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88348343 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.schema + +import java.lang.reflect.{Method, Type} +import java.util + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.TableFunction +import org.apache.calcite.schema.impl.ReflectiveFunctionBase +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.{FlinkTypeFactory, TableException} + +class FlinkTableFunctionImpl[T](val typeInfo: TypeInformation[T], --- End diff -- Please add a brief description of the class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88347669 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.functions + +import org.apache.calcite.sql.SqlFunction +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.functions.InvalidTypesException +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory} + +import scala.collection.mutable.ListBuffer + +/** + * Base class for a user-defined table function (UDTF). A user-defined table functions works on + * one row as input and returns multiple rows as output. + * + * The behavior of a [[TableFunction]] can be defined by implementing a custom evaluation + * method. An evaluation method must be declared publicly and named "eval". Evaluation methods + * can also be overloaded by implementing multiple methods named "eval". + * + * User-defined functions must have a default constructor and must be instantiable during runtime. + * + * By default the result type of an evaluation method is determined by Flink's type extraction + * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more + * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type + * can be manually defined by overriding [[getResultType()]]. + * + * Internally, the Table/SQL API code generation works with primitive values as much as possible. + * If a user-defined table function should not introduce much overhead during runtime, it is + * recommended to declare parameters and result types as primitive types instead of their boxed + * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long. + * + * @tparam T The type of the output row + */ +abstract class TableFunction[T] extends UserDefinedFunction with EvaluableFunction { + + private val rows: ListBuffer[T] = new ListBuffer + + /** +* Emit an output row +* +* @param row the output row +*/ + protected def collect(row: T): Unit = { +// cache rows for now, maybe immediately process them further +rows += row + } + + + @Internal + def getRowsIterator = rows.toIterator + + @Internal + def clear() = rows.clear() + + // this method will not be called, because we need to register multiple sql function at one time + override private[flink] final def createSqlFunction( + name: String, + typeFactory: FlinkTypeFactory) +: SqlFunction = { +throw new UnsupportedOperationException("this method should not be called") --- End diff -- Why is this method not necessary for `TableFunction`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88337706 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/call.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import java.lang.reflect.Method + +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.{FlinkTypeFactory, TableEnvironment, TableException, UnresolvedException} +import org.apache.flink.api.table.expressions.{Attribute, Expression, ResolvedFieldReference, UnresolvedFieldReference} +import org.apache.flink.api.table.functions.TableFunction +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils._ +import org.apache.flink.api.table.plan.schema.FlinkTableFunctionImpl +import org.apache.flink.api.table.validate.ValidationFailure + +import scala.collection.JavaConversions._ + +/** + * General expression for unresolved user-defined table function calls. + */ +case class UnresolvedTableFunctionCall(functionName: String, args: Seq[Expression]) + extends LogicalNode { + + override def output: Seq[Attribute] = +throw UnresolvedException("Invalid call to output on UnresolvedTableFunctionCall") + + override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = +throw UnresolvedException("Invalid call to construct on UnresolvedTableFunctionCall") + + override private[flink] def children: Seq[LogicalNode] = +throw UnresolvedException("Invalid call to children on UnresolvedTableFunctionCall") +} + +/** + * LogicalNode for calling a user-defined table functions. + * @param tableFunction table function to be called (might be overloaded) + * @param parameters actual parameters + * @param alias output fields renaming + * @tparam T type of returned table + */ +case class TableFunctionCall[T: TypeInformation]( + tableFunction: TableFunction[T], + parameters: Seq[Expression], + alias: Option[Array[String]]) extends UnaryNode { + + private var table: LogicalNode = _ + override def child: LogicalNode = table + + def setChild(child: LogicalNode): TableFunctionCall[T] = { +table = child +this + } + + private val resultType: TypeInformation[T] = +if (tableFunction.getResultType == null) { + implicitly[TypeInformation[T]] +} else { + tableFunction.getResultType +} + + private val fieldNames: Array[String] = +if (alias.isEmpty) { + getFieldAttribute[T](resultType)._1 +} else { + alias.get +} + private val fieldTypes: Array[TypeInformation[_]] = getFieldAttribute[T](resultType)._2 + + /** +* Assigns an alias for this table function returned fields that the following `select()` clause +* can refer to. +* +* @param aliasList alias for this window +* @return this table function +*/ + def as(aliasList: Expression*): TableFunctionCall[T] = { +if (aliasList == null) { + return this +} +if (aliasList.length != fieldNames.length) { + failValidation("Aliasing not match number of fields") +} else if (!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) { + failValidation("Alias only accept name expressions as arguments") +} else { + val names = aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name).toArray + TableFunctionCall(tableFunction, parameters, Some(names)) +} + } + + override def output: Seq[Attribute] = fieldNames.zip(fieldTypes).map { +case (n, t) =>
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88343373 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rex.{RexNode, RexCall} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.plan.nodes.FlinkCorrelate +import org.apache.flink.api.table.typeutils.TypeConverter._ + +/** + * Flink RelNode which matches along with cross apply a user defined table function. + */ +class DataSetCorrelate( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +scan: LogicalTableFunctionScan, +condition: RexNode, --- End diff -- use `Option[RexNode]` for `condition` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88345505 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.schema + +import java.lang.reflect.{Method, Type} +import java.util + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.TableFunction +import org.apache.calcite.schema.impl.ReflectiveFunctionBase +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.{FlinkTypeFactory, TableException} + +class FlinkTableFunctionImpl[T](val typeInfo: TypeInformation[T], --- End diff -- Please indent parameters as in `DataSetCorrelate`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671936#comment-15671936 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88339350 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rex.{RexNode, RexCall} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression, GeneratedFunction} +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.runtime.FlatMapRunner +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.table.typeutils.TypeConverter._ +import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig} + +import scala.collection.JavaConversions._ + +/** + * cross/outer apply a user-defined table function + */ +trait FlinkCorrelate { + + private[flink] def functionBody(generator: CodeGenerator, --- End diff -- can you change the constructor parameter wrapping and indention to be similar to `DataSetCorrelate`? > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > ta
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671957#comment-15671957 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88347146 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/EvaluableFunction.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.functions + +import java.lang.reflect.{Modifier, Method} +import org.apache.flink.api.table.ValidationException + +/** + * User-defined function has eval methods can extend this trait to reuse the same logic, such as: + * [[ScalarFunction]] and [[TableFunction]]. + */ +trait EvaluableFunction { + + private lazy val evalMethods = checkAndExtractEvalMethods() + private lazy val signatures = evalMethods.map(_.getParameterTypes) + + /** +* Extracts evaluation methods and throws a [[ValidationException]] if no implementation +* can be found. +*/ + private def checkAndExtractEvalMethods(): Array[Method] = { +val methods = getClass + .getDeclaredMethods + .filter { m => +val modifiers = m.getModifiers +m.getName == "eval" && Modifier.isPublic(modifiers) && !Modifier.isAbstract(modifiers) + } + +if (methods.isEmpty) { + throw new ValidationException(s"Table function class '$this' does not implement at least " + --- End diff -- If this method is also used for `ScalarFunction`, the exception message should be adapted accordingly. > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671950#comment-15671950 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88345505 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.schema + +import java.lang.reflect.{Method, Type} +import java.util + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.TableFunction +import org.apache.calcite.schema.impl.ReflectiveFunctionBase +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.{FlinkTypeFactory, TableException} + +class FlinkTableFunctionImpl[T](val typeInfo: TypeInformation[T], --- End diff -- Please indent parameters as in `DataSetCorrelate`. > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671924#comment-15671924 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88336271 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.functions + +import org.apache.calcite.sql.SqlFunction +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.functions.InvalidTypesException +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory} + +import scala.collection.mutable.ListBuffer + +/** + * Base class for a user-defined table function (UDTF). A user-defined table functions works on + * one row as input and returns multiple rows as output. + * + * The behavior of a [[TableFunction]] can be defined by implementing a custom evaluation + * method. An evaluation method must be declared publicly and named "eval". Evaluation methods + * can also be overloaded by implementing multiple methods named "eval". + * + * User-defined functions must have a default constructor and must be instantiable during runtime. + * + * By default the result type of an evaluation method is determined by Flink's type extraction + * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more + * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type + * can be manually defined by overriding [[getResultType()]]. + * + * Internally, the Table/SQL API code generation works with primitive values as much as possible. + * If a user-defined table function should not introduce much overhead during runtime, it is + * recommended to declare parameters and result types as primitive types instead of their boxed + * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long. + * + * @tparam T The type of the output row + */ +abstract class TableFunction[T] extends UserDefinedFunction with EvaluableFunction { + + private val rows: ListBuffer[T] = new ListBuffer + + /** +* Emit an output row +* +* @param row the output row +*/ + protected def collect(row: T): Unit = { +// cache rows for now, maybe immediately process them further +rows += row + } + + + @Internal + def getRowsIterator = rows.toIterator + + @Internal + def clear() = rows.clear() + + // this method will not be called, because we need to register multiple sql function at one time + override private[flink] final def createSqlFunction( + name: String, + typeFactory: FlinkTypeFactory) +: SqlFunction = { +throw new UnsupportedOperationException("this method should not be called") + } + + // -- + + /** +* Returns the result type of the evaluation method with a given signature. +* +* This method needs to be overriden in case Flink's type extraction facilities are not +* sufficient to extract the [[TypeInformation]] based on the return type of the evaluation +* method. Flink's type extraction facilities can handle basic types or +* simple POJOs but might be wrong for more complex, custom, or composite types. +* +* @return [[TypeInformation]] of result type or null if Flink should determine the type +*/ + def getResultType: TypeInformation[T] = null
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88335427 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.functions + +import org.apache.calcite.sql.SqlFunction +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.functions.InvalidTypesException +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory} + +import scala.collection.mutable.ListBuffer + +/** + * Base class for a user-defined table function (UDTF). A user-defined table functions works on + * one row as input and returns multiple rows as output. --- End diff -- change to `"... works on zero, one, or multiple scalar values as input ..."`. It does not need to be a complete row, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671934#comment-15671934 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88336779 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -162,24 +191,107 @@ object UserDefinedFunctionUtils { } /** +* Internal method of [[ScalarFunction#getResultType()]] that does some pre-checking and uses +* [[TypeExtractor]] as default return type inference. +*/ + def getResultType( +tableFunction: TableFunction[_], +signature: Array[Class[_]]) + : TypeInformation[_] = { +// find method for signature +val evalMethod = tableFunction.getEvalMethods + .find(m => signature.sameElements(m.getParameterTypes)) + .getOrElse(throw new ValidationException("Given signature is invalid.")) + +val userDefinedTypeInfo = tableFunction.getResultType +if (userDefinedTypeInfo != null) { + userDefinedTypeInfo +} else { + try { +TypeExtractor.getForClass(evalMethod.getReturnType) + } catch { +case ite: InvalidTypesException => + throw new ValidationException( +s"Return type of table function '$this' cannot be " + + s"automatically determined. Please provide type information manually.") + } +} + } + + /** * Returns the return type of the evaluation method matching the given signature. */ def getResultTypeClass( - scalarFunction: ScalarFunction, + function: EvaluableFunction, --- End diff -- I think `UserDefinedFunction` would be better. > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671955#comment-15671955 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88346931 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/EvaluableFunction.scala --- @@ -0,0 +1,62 @@ +/* --- End diff -- +1 I think it's a good idea to move these methods. > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88343933 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rex.{RexNode, RexCall} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.plan.nodes.FlinkCorrelate +import org.apache.flink.api.table.typeutils.TypeConverter._ + +/** + * Flink RelNode which matches along with cross apply a user defined table function. + */ +class DataSetCorrelate( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +scan: LogicalTableFunctionScan, +condition: RexNode, +relRowType: RelDataType, +joinRowType: RelDataType, +joinType: SemiJoinType, +ruleDescription: String) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkCorrelate + with DataSetRel { + override def deriveRowType() = relRowType + + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val rowCnt = metadata.getRowCount(getInput) + 10 --- End diff -- Not that is would actually matter at the moment, but why are you adding a constant here. Shouldn't it be something like `* 1.5` instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671940#comment-15671940 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88337810 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -17,20 +17,30 @@ */ package org.apache.flink.api.table.plan.logical +import java.lang.reflect.Method + +import com.google.common.collect.Sets import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.rel.core.CorrelationId --- End diff -- Most of the added imports are unused. > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88346931 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/EvaluableFunction.scala --- @@ -0,0 +1,62 @@ +/* --- End diff -- +1 I think it's a good idea to move these methods. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88346802 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala --- @@ -47,13 +52,50 @@ class FunctionCatalog { sqlFunctions += sqlFunction } + /** Register multiple sql functions at one time. The functions has the same name. **/ + def registerSqlFunctions(functions: Seq[SqlFunction]): Unit = { +if (functions.nonEmpty) { + sqlFunctions --= sqlFunctions.filter(_.getName == functions.head.getName) + sqlFunctions ++= functions +} + } + def getSqlOperatorTable: SqlOperatorTable = ChainedSqlOperatorTable.of( new BasicOperatorTable(), new ListSqlOperatorTable(sqlFunctions) ) /** +* Lookup table function and create an TableFunctionCall if we find a match. +*/ + def lookupTableFunction[T](name: String, children: Seq[Expression]): TableFunctionCall[T] = { +val funcClass = functionBuilders + .getOrElse(name.toLowerCase, throw ValidationException(s"Undefined function: $name")) +funcClass match { + // user-defined table function call + case tf if classOf[TableFunction[T]].isAssignableFrom(tf) => + Try(UserDefinedFunctionUtils.instantiate(tf.asInstanceOf[Class[TableFunction[T]]])) match { + case Success(tableFunction) => { +val clazz: Type = tableFunction.getClass.getGenericSuperclass +val generic = clazz match { + case cls: ParameterizedType => cls.getActualTypeArguments.toSeq.head + case _ => throw new TableException( +"New TableFunction classes need to inherit from TableFunction class," + + " and statement the generic type.") +} +implicit val typeInfo: TypeInformation[T] = TypeExtractor.createTypeInfo(generic) + .asInstanceOf[TypeInformation[T]] +TableFunctionCall(tableFunction, children, None) + } + case Failure(e) => throw ValidationException(e.getMessage) +} + case _ => +throw ValidationException("Unsupported table function.") --- End diff -- I think this exception message could be improved. It is throw if the registered method does not implement the `TableFunction` interface. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88344259 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rex.{RexCall, RexNode} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.StreamTableEnvironment +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.plan.nodes.FlinkCorrelate +import org.apache.flink.api.table.typeutils.TypeConverter._ +import org.apache.flink.streaming.api.datastream.DataStream + +/** + * Flink RelNode which matches along with cross apply a user defined table function. + */ +class DataStreamCorrelate( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +scan: LogicalTableFunctionScan, +condition: RexNode, +relRowType: RelDataType, +joinRowType: RelDataType, +joinType: SemiJoinType, +ruleDescription: String) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkCorrelate + with DataStreamRel { + override def deriveRowType() = relRowType + + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val rowCnt = metadata.getRowCount(getInput) + 10 +planner.getCostFactory.makeCost(rowCnt, rowCnt, 0) + } + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamCorrelate( + cluster, + traitSet, + inputs.get(0), + scan, + condition, + relRowType, + joinRowType, + joinType, + ruleDescription) + } + + override def toString: String = { +val funcRel = unwrap(scan) +val rexCall = funcRel.getCall.asInstanceOf[RexCall] +val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] +correlateToString(rexCall, sqlFunction) + } + + override def explainTerms(pw: RelWriter): RelWriter = { +val funcRel = unwrap(scan) +val rexCall = funcRel.getCall.asInstanceOf[RexCall] +val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] +super.explainTerms(pw) + .item("lateral", correlateToString(rexCall, sqlFunction)) + .item("select", selectToString(relRowType)) + } + + + override def translateToPlan(tableEnv: StreamTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { --- End diff -- please indent similar as the constructor parameters of this class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88336518 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -135,6 +138,32 @@ object UserDefinedFunctionUtils { } /** +* Returns eval method matching the given signature of [[TypeInformation]]. +*/ + def getEvalMethod( +function: EvaluableFunction, --- End diff -- would't `Class[_]` be more generic than necessary. `UserDefinedFunction` would work as well, no? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88342273 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan} +import org.apache.calcite.rex.RexNode +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetCorrelate} + +/** + * parser cross/outer apply + */ +class DataSetCorrelateRule + extends ConverterRule( + classOf[LogicalCorrelate], + Convention.NONE, + DataSetConvention.INSTANCE, + "DataSetCorrelateRule") + { + +override def matches(call: RelOptRuleCall): Boolean = { + val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate] + val right = join.getRight.asInstanceOf[RelSubset].getOriginal + + + right match { +// right node is a table function +case scan: LogicalTableFunctionScan => true +// a filter is pushed above the table function +case filter: LogicalFilter => + filter.getInput.asInstanceOf[RelSubset].getOriginal +.isInstanceOf[LogicalTableFunctionScan] +case _ => false + } +} + +override def convert(rel: RelNode): RelNode = { + val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE) + val right: RelNode = join.getInput(1) + + def convertToCorrelate(relNode: RelNode, condition: RexNode): DataSetCorrelate = { --- End diff -- define `condition` as `Option[RexNode]` so we do not have to use `null` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88345712 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala --- @@ -47,13 +52,50 @@ class FunctionCatalog { sqlFunctions += sqlFunction } + /** Register multiple sql functions at one time. The functions has the same name. **/ + def registerSqlFunctions(functions: Seq[SqlFunction]): Unit = { --- End diff -- Please check that all functions have the same name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671942#comment-15671942 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88339652 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rex.{RexNode, RexCall} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression, GeneratedFunction} +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.runtime.FlatMapRunner +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.table.typeutils.TypeConverter._ +import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig} + +import scala.collection.JavaConversions._ + +/** + * cross/outer apply a user-defined table function + */ +trait FlinkCorrelate { + + private[flink] def functionBody(generator: CodeGenerator, + udtfTypeInfo: TypeInformation[Any], + rowType: RelDataType, + rexCall: RexCall, + condition: RexNode, + config: TableConfig, + joinType: SemiJoinType, + expectedType: Option[TypeInformation[Any]]): String = { + +val returnType = determineReturnType( + rowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + +val (input1AccessExprs, input2AccessExprs) = generator.generateCorrelateAccessExprs +val crossResultExpr = generator.generateResultExpression(input1AccessExprs ++ input2AccessExprs, + returnType, rowType.getFieldNames) + +val input2NullExprs = input2AccessExprs.map( --- End diff -- I think `input2NullExpr` and `outerResultExpr` can be moved into the `else` branch of the `if (joinType == SemiJoinType.INNER)` condition. > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > publi
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671925#comment-15671925 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88339010 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -611,6 +612,130 @@ class Table( } /** +* The Cross Apply returns rows form the outer table (table on the left of the Apply operator) +* that produces matching values from the table-valued function (which is on the right side of +* the operator). +* +* The Cross Apply is equivalent to Inner Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* class MySplitUDTF extends TableFunction[String] { +* def eval(str: String): Unit = { +* str.split("#").foreach(collect) +* } +* } +* +* val split = new MySplitUDTF() +* table.crossApply(split('c).as('s)).select('a,'b,'c,'s) +* }}} +*/ + def crossApply(udtf: TableFunctionCall[_]): Table = { +applyInternal(udtf, JoinType.INNER) + } + + /** +* The Cross Apply returns rows form the outer table (table on the left of the Apply operator) +* that produces matching values from the table-valued function (which is on the right side of +* the operator). +* +* The Cross Apply is equivalent to Inner Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* class MySplitUDTF extends TableFunction[String] { +* def eval(str: String): Unit = { +* str.split("#").foreach(collect) +* } +* } +* +* val split = new MySplitUDTF() +* table.crossApply("split('c') as (s)").select("a, b, c, s") +* }}} +*/ + def crossApply(udtf: String): Table = { +applyInternal(udtf, JoinType.INNER) + } + + /** +* The Cross Apply returns rows form the outer table (table on the left of the Apply operator) +* that produces matching values from the table-valued function (which is on the right side of +* the operator). +* +* The Cross Apply is equivalent to Inner Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* class MySplitUDTF extends TableFunction[String] { +* def eval(str: String): Unit = { +* str.split("#").foreach(collect) +* } +* } +* +* val split = new MySplitUDTF() +* table.outerApply(split('c).as('s)).select('a,'b,'c,'s) +* }}} +*/ + def outerApply(udtf: TableFunctionCall[_]): Table = { +applyInternal(udtf, JoinType.LEFT_OUTER) + } + + /** +* The Outer Apply returns all the rows from the outer table (table on the left of the Apply +* operator), and rows that do not matches the condition from the table-valued function (which +* is on the right side of the operator), NULL values are displayed. +* +* The Outer Apply is equivalent to Left Outer Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* val split = new MySplitUDTF() +* table.crossApply("split('c') as (s)").select("a, b, c, s") +* }}} +*/ + def outerApply(udtf: String): Table = { +applyInternal(udtf, JoinType.LEFT_OUTER) + } + + private def applyInternal(udtfString: String, joinType: JoinType): Table = { +val node = ExpressionParser.parseLogicalNode(udtfString) +var alias: Option[Seq[Expression]] = None +val functionCall = node match { + case AliasNode(aliasList, child) => +alias = Some(aliasList) +child + case _ => node +} + +functionCall match { + case call @ UnresolvedTableFunctionCall(name, args) => +val udtfCall = tableEnv.getFunctionCatalog.lookupTableFunction(name, args) +if (alias.isDefined) { + applyInternal(udtfCall.as(alias.get: _*), joinType) +} else { + applyInternal(udtfCall, joinType) +} + case _ => throw new TableException("Cross/Outer Apply only accept TableFunction") +} + } + + private def applyInternal(node: LogicalNode, joinType: JoinType): Table = { +node match { + case udtf: TableFunctionCall[_] => +udtf.setChild(this.logicalPlan) +new Table( +
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671959#comment-15671959 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88348205 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan} +import org.apache.calcite.rex.RexNode +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetCorrelate} + +/** + * parser cross/outer apply --- End diff -- Replace by "Rule to convert a LogicalCorrelate into a DataSetCorrelate.". > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671946#comment-15671946 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88344133 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rex.{RexNode, RexCall} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.plan.nodes.FlinkCorrelate +import org.apache.flink.api.table.typeutils.TypeConverter._ + +/** + * Flink RelNode which matches along with cross apply a user defined table function. + */ +class DataSetCorrelate( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +scan: LogicalTableFunctionScan, +condition: RexNode, +relRowType: RelDataType, +joinRowType: RelDataType, +joinType: SemiJoinType, +ruleDescription: String) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkCorrelate + with DataSetRel { + override def deriveRowType() = relRowType + + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val rowCnt = metadata.getRowCount(getInput) + 10 +planner.getCostFactory.makeCost(rowCnt, rowCnt, 0) + } + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetCorrelate( + cluster, + traitSet, + inputs.get(0), + scan, + condition, + relRowType, + joinRowType, + joinType, + ruleDescription) + } + + override def toString: String = { +val rexCall = scan.getCall.asInstanceOf[RexCall] +val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] +correlateToString(rexCall, sqlFunction) + } + + override def explainTerms(pw: RelWriter): RelWriter = { +val rexCall = scan.getCall.asInstanceOf[RexCall] +val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] +super.explainTerms(pw) + .item("lateral", correlateToString(rexCall, sqlFunction)) + .item("select", selectToString(relRowType)) + } + + + override def translateToPlan(tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { --- End diff -- please indent similar as the constructor parameters of this class > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671928#comment-15671928 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r87304840 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -360,7 +370,8 @@ case class Join( left: LogicalNode, right: LogicalNode, joinType: JoinType, -condition: Option[Expression]) extends BinaryNode { +condition: Option[Expression], +corId: Option[CorrelationId] = None) extends BinaryNode { --- End diff -- CorrelationId is a Calcite class. At this point we do all validation ourselves. Can we replace it? > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671930#comment-15671930 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r87303197 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala --- @@ -305,6 +305,16 @@ object ScalarFunctions { ) ) +// user-defined table function +case tsf: TableSqlFunction => --- End diff -- Should we rename this class (`ScalarFunctions`) because it contains also `TableFunction` logic? > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88345175 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rex.{RexCall, RexNode} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.StreamTableEnvironment +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.plan.nodes.FlinkCorrelate +import org.apache.flink.api.table.typeutils.TypeConverter._ +import org.apache.flink.streaming.api.datastream.DataStream + +/** + * Flink RelNode which matches along with cross apply a user defined table function. + */ +class DataStreamCorrelate( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +scan: LogicalTableFunctionScan, +condition: RexNode, +relRowType: RelDataType, +joinRowType: RelDataType, +joinType: SemiJoinType, +ruleDescription: String) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkCorrelate + with DataStreamRel { + override def deriveRowType() = relRowType + + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val rowCnt = metadata.getRowCount(getInput) + 10 +planner.getCostFactory.makeCost(rowCnt, rowCnt, 0) + } + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamCorrelate( + cluster, + traitSet, + inputs.get(0), + scan, + condition, + relRowType, + joinRowType, + joinType, + ruleDescription) + } + + override def toString: String = { +val funcRel = unwrap(scan) +val rexCall = funcRel.getCall.asInstanceOf[RexCall] +val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] +correlateToString(rexCall, sqlFunction) + } + + override def explainTerms(pw: RelWriter): RelWriter = { +val funcRel = unwrap(scan) +val rexCall = funcRel.getCall.asInstanceOf[RexCall] +val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] +super.explainTerms(pw) + .item("lateral", correlateToString(rexCall, sqlFunction)) + .item("select", selectToString(relRowType)) + } + + + override def translateToPlan(tableEnv: StreamTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { + +val config = tableEnv.getConfig +val returnType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + +val inputDS = inputNode.asInstanceOf[DataStreamRel] + .translateToPlan(tableEnv, Some(inputRowType(inputNode))) --- End diff -- I think we can replace `Some(inputRowType(inputNode))` by `Some(TypeConverter.DEFAULT_ROW_TYPE)` (similar as in `DataSetAggregate.translateToPlan()` Then we can also remove the method `FlinkCorrelate.inputRowType`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88325623 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/UserDefinedTableFunctionITCase.java --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.batch; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.BatchTableEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.TableEnvironment; +import org.apache.flink.api.table.functions.TableFunction; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + + +@RunWith(Parameterized.class) +public class UserDefinedTableFunctionITCase extends TableProgramsTestBase { + + public UserDefinedTableFunctionITCase(TestExecutionMode mode, TableConfigMode configMode){ + super(mode, configMode); + } + + + @Test + public void testUDTF() throws Exception { --- End diff -- rename to `testUDTFWithCrossApply`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88343322 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rex.{RexCall, RexNode} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.StreamTableEnvironment +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.plan.nodes.FlinkCorrelate +import org.apache.flink.api.table.typeutils.TypeConverter._ +import org.apache.flink.streaming.api.datastream.DataStream + +/** + * Flink RelNode which matches along with cross apply a user defined table function. + */ +class DataStreamCorrelate( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +scan: LogicalTableFunctionScan, +condition: RexNode, --- End diff -- use `Option[RexNode]` for `condition` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88337810 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -17,20 +17,30 @@ */ package org.apache.flink.api.table.plan.logical +import java.lang.reflect.Method + +import com.google.common.collect.Sets import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.rel.core.CorrelationId --- End diff -- Most of the added imports are unused. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88348227 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.rules.datastream + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan} +import org.apache.calcite.rex.RexNode +import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamCorrelate, DataStreamConvention} + +/** + * parser cross/outer apply --- End diff -- Replace by "Rule to convert a LogicalCorrelate into a DataStreamCorrelate.". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88339010 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -611,6 +612,130 @@ class Table( } /** +* The Cross Apply returns rows form the outer table (table on the left of the Apply operator) +* that produces matching values from the table-valued function (which is on the right side of +* the operator). +* +* The Cross Apply is equivalent to Inner Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* class MySplitUDTF extends TableFunction[String] { +* def eval(str: String): Unit = { +* str.split("#").foreach(collect) +* } +* } +* +* val split = new MySplitUDTF() +* table.crossApply(split('c).as('s)).select('a,'b,'c,'s) +* }}} +*/ + def crossApply(udtf: TableFunctionCall[_]): Table = { +applyInternal(udtf, JoinType.INNER) + } + + /** +* The Cross Apply returns rows form the outer table (table on the left of the Apply operator) +* that produces matching values from the table-valued function (which is on the right side of +* the operator). +* +* The Cross Apply is equivalent to Inner Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* class MySplitUDTF extends TableFunction[String] { +* def eval(str: String): Unit = { +* str.split("#").foreach(collect) +* } +* } +* +* val split = new MySplitUDTF() +* table.crossApply("split('c') as (s)").select("a, b, c, s") +* }}} +*/ + def crossApply(udtf: String): Table = { +applyInternal(udtf, JoinType.INNER) + } + + /** +* The Cross Apply returns rows form the outer table (table on the left of the Apply operator) +* that produces matching values from the table-valued function (which is on the right side of +* the operator). +* +* The Cross Apply is equivalent to Inner Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* class MySplitUDTF extends TableFunction[String] { +* def eval(str: String): Unit = { +* str.split("#").foreach(collect) +* } +* } +* +* val split = new MySplitUDTF() +* table.outerApply(split('c).as('s)).select('a,'b,'c,'s) +* }}} +*/ + def outerApply(udtf: TableFunctionCall[_]): Table = { +applyInternal(udtf, JoinType.LEFT_OUTER) + } + + /** +* The Outer Apply returns all the rows from the outer table (table on the left of the Apply +* operator), and rows that do not matches the condition from the table-valued function (which +* is on the right side of the operator), NULL values are displayed. +* +* The Outer Apply is equivalent to Left Outer Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* val split = new MySplitUDTF() +* table.crossApply("split('c') as (s)").select("a, b, c, s") +* }}} +*/ + def outerApply(udtf: String): Table = { +applyInternal(udtf, JoinType.LEFT_OUTER) + } + + private def applyInternal(udtfString: String, joinType: JoinType): Table = { +val node = ExpressionParser.parseLogicalNode(udtfString) +var alias: Option[Seq[Expression]] = None +val functionCall = node match { + case AliasNode(aliasList, child) => +alias = Some(aliasList) +child + case _ => node +} + +functionCall match { + case call @ UnresolvedTableFunctionCall(name, args) => +val udtfCall = tableEnv.getFunctionCatalog.lookupTableFunction(name, args) +if (alias.isDefined) { + applyInternal(udtfCall.as(alias.get: _*), joinType) +} else { + applyInternal(udtfCall, joinType) +} + case _ => throw new TableException("Cross/Outer Apply only accept TableFunction") +} + } + + private def applyInternal(node: LogicalNode, joinType: JoinType): Table = { +node match { + case udtf: TableFunctionCall[_] => +udtf.setChild(this.logicalPlan) +new Table( + tableEnv, + Join(this.logicalPlan, udtf.validate(tableEnv), joinType, None, + Some(relBuilder.getCluster.createCorrel())).validate(tableEnv)) --- End diff -- We kept Calcite code so far out of our `LogicalNode
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671956#comment-15671956 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88347669 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.functions + +import org.apache.calcite.sql.SqlFunction +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.functions.InvalidTypesException +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory} + +import scala.collection.mutable.ListBuffer + +/** + * Base class for a user-defined table function (UDTF). A user-defined table functions works on + * one row as input and returns multiple rows as output. + * + * The behavior of a [[TableFunction]] can be defined by implementing a custom evaluation + * method. An evaluation method must be declared publicly and named "eval". Evaluation methods + * can also be overloaded by implementing multiple methods named "eval". + * + * User-defined functions must have a default constructor and must be instantiable during runtime. + * + * By default the result type of an evaluation method is determined by Flink's type extraction + * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more + * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type + * can be manually defined by overriding [[getResultType()]]. + * + * Internally, the Table/SQL API code generation works with primitive values as much as possible. + * If a user-defined table function should not introduce much overhead during runtime, it is + * recommended to declare parameters and result types as primitive types instead of their boxed + * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long. + * + * @tparam T The type of the output row + */ +abstract class TableFunction[T] extends UserDefinedFunction with EvaluableFunction { + + private val rows: ListBuffer[T] = new ListBuffer + + /** +* Emit an output row +* +* @param row the output row +*/ + protected def collect(row: T): Unit = { +// cache rows for now, maybe immediately process them further +rows += row + } + + + @Internal + def getRowsIterator = rows.toIterator + + @Internal + def clear() = rows.clear() + + // this method will not be called, because we need to register multiple sql function at one time + override private[flink] final def createSqlFunction( + name: String, + typeFactory: FlinkTypeFactory) +: SqlFunction = { +throw new UnsupportedOperationException("this method should not be called") --- End diff -- Why is this method not necessary for `TableFunction`? > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and retu
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671951#comment-15671951 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88345712 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala --- @@ -47,13 +52,50 @@ class FunctionCatalog { sqlFunctions += sqlFunction } + /** Register multiple sql functions at one time. The functions has the same name. **/ + def registerSqlFunctions(functions: Seq[SqlFunction]): Unit = { --- End diff -- Please check that all functions have the same name. > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88344133 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rex.{RexNode, RexCall} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.plan.nodes.FlinkCorrelate +import org.apache.flink.api.table.typeutils.TypeConverter._ + +/** + * Flink RelNode which matches along with cross apply a user defined table function. + */ +class DataSetCorrelate( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +scan: LogicalTableFunctionScan, +condition: RexNode, +relRowType: RelDataType, +joinRowType: RelDataType, +joinType: SemiJoinType, +ruleDescription: String) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkCorrelate + with DataSetRel { + override def deriveRowType() = relRowType + + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val rowCnt = metadata.getRowCount(getInput) + 10 +planner.getCostFactory.makeCost(rowCnt, rowCnt, 0) + } + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetCorrelate( + cluster, + traitSet, + inputs.get(0), + scan, + condition, + relRowType, + joinRowType, + joinType, + ruleDescription) + } + + override def toString: String = { +val rexCall = scan.getCall.asInstanceOf[RexCall] +val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] +correlateToString(rexCall, sqlFunction) + } + + override def explainTerms(pw: RelWriter): RelWriter = { +val rexCall = scan.getCall.asInstanceOf[RexCall] +val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] +super.explainTerms(pw) + .item("lateral", correlateToString(rexCall, sqlFunction)) + .item("select", selectToString(relRowType)) + } + + + override def translateToPlan(tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { --- End diff -- please indent similar as the constructor parameters of this class --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88339652 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rex.{RexNode, RexCall} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression, GeneratedFunction} +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.runtime.FlatMapRunner +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.table.typeutils.TypeConverter._ +import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig} + +import scala.collection.JavaConversions._ + +/** + * cross/outer apply a user-defined table function + */ +trait FlinkCorrelate { + + private[flink] def functionBody(generator: CodeGenerator, + udtfTypeInfo: TypeInformation[Any], + rowType: RelDataType, + rexCall: RexCall, + condition: RexNode, + config: TableConfig, + joinType: SemiJoinType, + expectedType: Option[TypeInformation[Any]]): String = { + +val returnType = determineReturnType( + rowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + +val (input1AccessExprs, input2AccessExprs) = generator.generateCorrelateAccessExprs +val crossResultExpr = generator.generateResultExpression(input1AccessExprs ++ input2AccessExprs, + returnType, rowType.getFieldNames) + +val input2NullExprs = input2AccessExprs.map( --- End diff -- I think `input2NullExpr` and `outerResultExpr` can be moved into the `else` branch of the `if (joinType == SemiJoinType.INNER)` condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671939#comment-15671939 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88335427 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.functions + +import org.apache.calcite.sql.SqlFunction +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.functions.InvalidTypesException +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory} + +import scala.collection.mutable.ListBuffer + +/** + * Base class for a user-defined table function (UDTF). A user-defined table functions works on + * one row as input and returns multiple rows as output. --- End diff -- change to `"... works on zero, one, or multiple scalar values as input ..."`. It does not need to be a complete row, right? > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671938#comment-15671938 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88338243 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -611,6 +612,130 @@ class Table( } /** +* The Cross Apply returns rows form the outer table (table on the left of the Apply operator) +* that produces matching values from the table-valued function (which is on the right side of +* the operator). +* +* The Cross Apply is equivalent to Inner Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* class MySplitUDTF extends TableFunction[String] { +* def eval(str: String): Unit = { +* str.split("#").foreach(collect) +* } +* } +* +* val split = new MySplitUDTF() +* table.crossApply(split('c).as('s)).select('a,'b,'c,'s) +* }}} +*/ + def crossApply(udtf: TableFunctionCall[_]): Table = { +applyInternal(udtf, JoinType.INNER) + } + + /** +* The Cross Apply returns rows form the outer table (table on the left of the Apply operator) +* that produces matching values from the table-valued function (which is on the right side of +* the operator). +* +* The Cross Apply is equivalent to Inner Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* class MySplitUDTF extends TableFunction[String] { +* def eval(str: String): Unit = { +* str.split("#").foreach(collect) +* } +* } +* +* val split = new MySplitUDTF() +* table.crossApply("split('c') as (s)").select("a, b, c, s") +* }}} +*/ + def crossApply(udtf: String): Table = { +applyInternal(udtf, JoinType.INNER) + } + + /** +* The Cross Apply returns rows form the outer table (table on the left of the Apply operator) +* that produces matching values from the table-valued function (which is on the right side of +* the operator). +* +* The Cross Apply is equivalent to Inner Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* class MySplitUDTF extends TableFunction[String] { +* def eval(str: String): Unit = { +* str.split("#").foreach(collect) +* } +* } +* +* val split = new MySplitUDTF() +* table.outerApply(split('c).as('s)).select('a,'b,'c,'s) +* }}} +*/ + def outerApply(udtf: TableFunctionCall[_]): Table = { +applyInternal(udtf, JoinType.LEFT_OUTER) + } + + /** +* The Outer Apply returns all the rows from the outer table (table on the left of the Apply +* operator), and rows that do not matches the condition from the table-valued function (which +* is on the right side of the operator), NULL values are displayed. +* +* The Outer Apply is equivalent to Left Outer Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* val split = new MySplitUDTF() +* table.crossApply("split('c') as (s)").select("a, b, c, s") --- End diff -- `crossApply` should be `outerApply`. Please check the whole docs for this method. > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671943#comment-15671943 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88339990 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rex.{RexNode, RexCall} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression, GeneratedFunction} +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.runtime.FlatMapRunner +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.table.typeutils.TypeConverter._ +import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig} + +import scala.collection.JavaConversions._ + +/** + * cross/outer apply a user-defined table function + */ +trait FlinkCorrelate { + + private[flink] def functionBody(generator: CodeGenerator, + udtfTypeInfo: TypeInformation[Any], + rowType: RelDataType, + rexCall: RexCall, + condition: RexNode, + config: TableConfig, + joinType: SemiJoinType, + expectedType: Option[TypeInformation[Any]]): String = { + +val returnType = determineReturnType( + rowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + +val (input1AccessExprs, input2AccessExprs) = generator.generateCorrelateAccessExprs +val crossResultExpr = generator.generateResultExpression(input1AccessExprs ++ input2AccessExprs, + returnType, rowType.getFieldNames) + +val input2NullExprs = input2AccessExprs.map( + x => GeneratedExpression("null", "true", "", x.resultType)) +val outerResultExpr = generator.generateResultExpression(input1AccessExprs ++ input2NullExprs, + returnType, rowType.getFieldNames) + +val call = generator.generateExpression(rexCall) +var body = call.code + + s""" + |scala.collection.Iterator iter = ${call.resultTerm}.getRowsIterator(); +""".stripMargin +if (joinType == SemiJoinType.INNER) { + // cross apply + body += +s""" + |if (iter.isEmpty()) { + | return; + |} +""".stripMargin +} else { --- End diff -- I think it would be safer to add an `else if (joinType == SemiJoinType.LEFT)` here and throw an exception in `else`. > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a sing
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671931#comment-15671931 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88325623 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/UserDefinedTableFunctionITCase.java --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.batch; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.BatchTableEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.TableEnvironment; +import org.apache.flink.api.table.functions.TableFunction; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + + +@RunWith(Parameterized.class) +public class UserDefinedTableFunctionITCase extends TableProgramsTestBase { + + public UserDefinedTableFunctionITCase(TestExecutionMode mode, TableConfigMode configMode){ + super(mode, configMode); + } + + + @Test + public void testUDTF() throws Exception { --- End diff -- rename to `testUDTFWithCrossApply`? > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) >
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671958#comment-15671958 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88348343 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTableFunctionImpl.scala --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.schema + +import java.lang.reflect.{Method, Type} +import java.util + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.TableFunction +import org.apache.calcite.schema.impl.ReflectiveFunctionBase +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.{FlinkTypeFactory, TableException} + +class FlinkTableFunctionImpl[T](val typeInfo: TypeInformation[T], --- End diff -- Please add a brief description of the class. > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671953#comment-15671953 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88343373 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rex.{RexNode, RexCall} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.plan.nodes.FlinkCorrelate +import org.apache.flink.api.table.typeutils.TypeConverter._ + +/** + * Flink RelNode which matches along with cross apply a user defined table function. + */ +class DataSetCorrelate( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +scan: LogicalTableFunctionScan, +condition: RexNode, --- End diff -- use `Option[RexNode]` for `condition` > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c)
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671933#comment-15671933 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88338123 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -611,6 +612,130 @@ class Table( } /** +* The Cross Apply returns rows form the outer table (table on the left of the Apply operator) +* that produces matching values from the table-valued function (which is on the right side of +* the operator). +* +* The Cross Apply is equivalent to Inner Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* class MySplitUDTF extends TableFunction[String] { +* def eval(str: String): Unit = { +* str.split("#").foreach(collect) +* } +* } +* +* val split = new MySplitUDTF() +* table.crossApply(split('c).as('s)).select('a,'b,'c,'s) +* }}} +*/ + def crossApply(udtf: TableFunctionCall[_]): Table = { +applyInternal(udtf, JoinType.INNER) + } + + /** +* The Cross Apply returns rows form the outer table (table on the left of the Apply operator) +* that produces matching values from the table-valued function (which is on the right side of +* the operator). +* +* The Cross Apply is equivalent to Inner Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* class MySplitUDTF extends TableFunction[String] { +* def eval(str: String): Unit = { +* str.split("#").foreach(collect) +* } +* } +* +* val split = new MySplitUDTF() +* table.crossApply("split('c') as (s)").select("a, b, c, s") +* }}} +*/ + def crossApply(udtf: String): Table = { +applyInternal(udtf, JoinType.INNER) + } + + /** +* The Cross Apply returns rows form the outer table (table on the left of the Apply operator) --- End diff -- `Cross Apply` should be `Outer Apply`. Please check the complete docs for this method. > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671935#comment-15671935 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88336518 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -135,6 +138,32 @@ object UserDefinedFunctionUtils { } /** +* Returns eval method matching the given signature of [[TypeInformation]]. +*/ + def getEvalMethod( +function: EvaluableFunction, --- End diff -- would't `Class[_]` be more generic than necessary. `UserDefinedFunction` would work as well, no? > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671929#comment-15671929 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88335539 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.functions + +import org.apache.calcite.sql.SqlFunction +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.functions.InvalidTypesException +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory} + +import scala.collection.mutable.ListBuffer + +/** + * Base class for a user-defined table function (UDTF). A user-defined table functions works on + * one row as input and returns multiple rows as output. + * + * The behavior of a [[TableFunction]] can be defined by implementing a custom evaluation + * method. An evaluation method must be declared publicly and named "eval". Evaluation methods + * can also be overloaded by implementing multiple methods named "eval". --- End diff -- Maybe add a short code example for `eval()` here? > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671937#comment-15671937 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88337239 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/call.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import java.lang.reflect.Method + +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.{FlinkTypeFactory, TableEnvironment, TableException, UnresolvedException} +import org.apache.flink.api.table.expressions.{Attribute, Expression, ResolvedFieldReference, UnresolvedFieldReference} +import org.apache.flink.api.table.functions.TableFunction +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils._ +import org.apache.flink.api.table.plan.schema.FlinkTableFunctionImpl +import org.apache.flink.api.table.validate.ValidationFailure + +import scala.collection.JavaConversions._ + +/** + * General expression for unresolved user-defined table function calls. + */ +case class UnresolvedTableFunctionCall(functionName: String, args: Seq[Expression]) + extends LogicalNode { + + override def output: Seq[Attribute] = +throw UnresolvedException("Invalid call to output on UnresolvedTableFunctionCall") + + override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = +throw UnresolvedException("Invalid call to construct on UnresolvedTableFunctionCall") + + override private[flink] def children: Seq[LogicalNode] = +throw UnresolvedException("Invalid call to children on UnresolvedTableFunctionCall") +} + +/** + * LogicalNode for calling a user-defined table functions. + * @param tableFunction table function to be called (might be overloaded) + * @param parameters actual parameters + * @param alias output fields renaming + * @tparam T type of returned table + */ +case class TableFunctionCall[T: TypeInformation]( + tableFunction: TableFunction[T], + parameters: Seq[Expression], + alias: Option[Array[String]]) extends UnaryNode { + + private var table: LogicalNode = _ + override def child: LogicalNode = table + + def setChild(child: LogicalNode): TableFunctionCall[T] = { +table = child +this + } + + private val resultType: TypeInformation[T] = +if (tableFunction.getResultType == null) { + implicitly[TypeInformation[T]] +} else { + tableFunction.getResultType +} + + private val fieldNames: Array[String] = +if (alias.isEmpty) { + getFieldAttribute[T](resultType)._1 +} else { + alias.get +} + private val fieldTypes: Array[TypeInformation[_]] = getFieldAttribute[T](resultType)._2 + + /** +* Assigns an alias for this table function returned fields that the following `select()` clause +* can refer to. +* +* @param aliasList alias for this window +* @return this table function +*/ + def as(aliasList: Expression*): TableFunctionCall[T] = { +if (aliasList == null) { + return this +} +if (aliasList.length != fieldNames.length) { + failValidation("Aliasing not match number of fields") +} else if (!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) { + failValidation("Alias only accept name expressions as arguments") +} else { + val names = aliasList
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671949#comment-15671949 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88344259 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rex.{RexCall, RexNode} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.StreamTableEnvironment +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.plan.nodes.FlinkCorrelate +import org.apache.flink.api.table.typeutils.TypeConverter._ +import org.apache.flink.streaming.api.datastream.DataStream + +/** + * Flink RelNode which matches along with cross apply a user defined table function. + */ +class DataStreamCorrelate( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +scan: LogicalTableFunctionScan, +condition: RexNode, +relRowType: RelDataType, +joinRowType: RelDataType, +joinType: SemiJoinType, +ruleDescription: String) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkCorrelate + with DataStreamRel { + override def deriveRowType() = relRowType + + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val rowCnt = metadata.getRowCount(getInput) + 10 +planner.getCostFactory.makeCost(rowCnt, rowCnt, 0) + } + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamCorrelate( + cluster, + traitSet, + inputs.get(0), + scan, + condition, + relRowType, + joinRowType, + joinType, + ruleDescription) + } + + override def toString: String = { +val funcRel = unwrap(scan) +val rexCall = funcRel.getCall.asInstanceOf[RexCall] +val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] +correlateToString(rexCall, sqlFunction) + } + + override def explainTerms(pw: RelWriter): RelWriter = { +val funcRel = unwrap(scan) +val rexCall = funcRel.getCall.asInstanceOf[RexCall] +val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] +super.explainTerms(pw) + .item("lateral", correlateToString(rexCall, sqlFunction)) + .item("select", selectToString(relRowType)) + } + + + override def translateToPlan(tableEnv: StreamTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { --- End diff -- please indent similar as the constructor parameters of this class. > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Rep
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671945#comment-15671945 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88343322 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rex.{RexCall, RexNode} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.StreamTableEnvironment +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.plan.nodes.FlinkCorrelate +import org.apache.flink.api.table.typeutils.TypeConverter._ +import org.apache.flink.streaming.api.datastream.DataStream + +/** + * Flink RelNode which matches along with cross apply a user defined table function. + */ +class DataStreamCorrelate( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +scan: LogicalTableFunctionScan, +condition: RexNode, --- End diff -- use `Option[RexNode]` for `condition` > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDT
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671948#comment-15671948 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88345307 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rex.{RexNode, RexCall} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.plan.nodes.FlinkCorrelate +import org.apache.flink.api.table.typeutils.TypeConverter._ + +/** + * Flink RelNode which matches along with cross apply a user defined table function. + */ +class DataSetCorrelate( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +scan: LogicalTableFunctionScan, +condition: RexNode, +relRowType: RelDataType, +joinRowType: RelDataType, +joinType: SemiJoinType, +ruleDescription: String) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkCorrelate + with DataSetRel { + override def deriveRowType() = relRowType + + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { +val rowCnt = metadata.getRowCount(getInput) + 10 +planner.getCostFactory.makeCost(rowCnt, rowCnt, 0) + } + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataSetCorrelate( + cluster, + traitSet, + inputs.get(0), + scan, + condition, + relRowType, + joinRowType, + joinType, + ruleDescription) + } + + override def toString: String = { +val rexCall = scan.getCall.asInstanceOf[RexCall] +val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] +correlateToString(rexCall, sqlFunction) + } + + override def explainTerms(pw: RelWriter): RelWriter = { +val rexCall = scan.getCall.asInstanceOf[RexCall] +val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] +super.explainTerms(pw) + .item("lateral", correlateToString(rexCall, sqlFunction)) + .item("select", selectToString(relRowType)) + } + + + override def translateToPlan(tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +val config = tableEnv.getConfig +val returnType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + +val inputDS = inputNode.asInstanceOf[DataSetRel] + .translateToPlan(tableEnv, Some(inputRowType(inputNode))) --- End diff -- I think we can replace `Some(inputRowType(inputNode))` by `Some(TypeConverter.DEFAULT_ROW_TYPE)` (similar as in `DataSetAggregate.translateToPlan()` Then we can also remove the method `Fl
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671941#comment-15671941 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88342301 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.rules.datastream + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan} +import org.apache.calcite.rex.RexNode +import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamCorrelate, DataStreamConvention} + +/** + * parser cross/outer apply + */ +class DataStreamCorrelateRule + extends ConverterRule( +classOf[LogicalCorrelate], +Convention.NONE, +DataStreamConvention.INSTANCE, +"DataStreamCorrelateRule") +{ + + override def matches(call: RelOptRuleCall): Boolean = { +val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate] +val right = join.getRight.asInstanceOf[RelSubset].getOriginal + +right match { + // right node is a table function + case scan: LogicalTableFunctionScan => true + // a filter is pushed above the table function + case filter: LogicalFilter => +filter.getInput.asInstanceOf[RelSubset].getOriginal + .isInstanceOf[LogicalTableFunctionScan] + case _ => false +} + } + + override def convert(rel: RelNode): RelNode = { +val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate] +val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) +val convInput: RelNode = RelOptRule.convert(join.getInput(0), DataStreamConvention.INSTANCE) +val right: RelNode = join.getInput(1) + +def convertToCorrelate(relNode: RelNode, condition: RexNode): DataStreamCorrelate = { --- End diff -- define `condition` as `Option[RexNode]` so we do not have to use `null` > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671926#comment-15671926 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88337706 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/call.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import java.lang.reflect.Method + +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.{FlinkTypeFactory, TableEnvironment, TableException, UnresolvedException} +import org.apache.flink.api.table.expressions.{Attribute, Expression, ResolvedFieldReference, UnresolvedFieldReference} +import org.apache.flink.api.table.functions.TableFunction +import org.apache.flink.api.table.functions.utils.TableSqlFunction +import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils._ +import org.apache.flink.api.table.plan.schema.FlinkTableFunctionImpl +import org.apache.flink.api.table.validate.ValidationFailure + +import scala.collection.JavaConversions._ + +/** + * General expression for unresolved user-defined table function calls. + */ +case class UnresolvedTableFunctionCall(functionName: String, args: Seq[Expression]) + extends LogicalNode { + + override def output: Seq[Attribute] = +throw UnresolvedException("Invalid call to output on UnresolvedTableFunctionCall") + + override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = +throw UnresolvedException("Invalid call to construct on UnresolvedTableFunctionCall") + + override private[flink] def children: Seq[LogicalNode] = +throw UnresolvedException("Invalid call to children on UnresolvedTableFunctionCall") +} + +/** + * LogicalNode for calling a user-defined table functions. + * @param tableFunction table function to be called (might be overloaded) + * @param parameters actual parameters + * @param alias output fields renaming + * @tparam T type of returned table + */ +case class TableFunctionCall[T: TypeInformation]( + tableFunction: TableFunction[T], + parameters: Seq[Expression], + alias: Option[Array[String]]) extends UnaryNode { + + private var table: LogicalNode = _ + override def child: LogicalNode = table + + def setChild(child: LogicalNode): TableFunctionCall[T] = { +table = child +this + } + + private val resultType: TypeInformation[T] = +if (tableFunction.getResultType == null) { + implicitly[TypeInformation[T]] +} else { + tableFunction.getResultType +} + + private val fieldNames: Array[String] = +if (alias.isEmpty) { + getFieldAttribute[T](resultType)._1 +} else { + alias.get +} + private val fieldTypes: Array[TypeInformation[_]] = getFieldAttribute[T](resultType)._2 + + /** +* Assigns an alias for this table function returned fields that the following `select()` clause +* can refer to. +* +* @param aliasList alias for this window +* @return this table function +*/ + def as(aliasList: Expression*): TableFunctionCall[T] = { +if (aliasList == null) { + return this +} +if (aliasList.length != fieldNames.length) { + failValidation("Aliasing not match number of fields") +} else if (!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) { + failValidation("Alias only accept name expressions as arguments") +} else { + val names = aliasList
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671944#comment-15671944 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88342273 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan} +import org.apache.calcite.rex.RexNode +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetCorrelate} + +/** + * parser cross/outer apply + */ +class DataSetCorrelateRule + extends ConverterRule( + classOf[LogicalCorrelate], + Convention.NONE, + DataSetConvention.INSTANCE, + "DataSetCorrelateRule") + { + +override def matches(call: RelOptRuleCall): Boolean = { + val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate] + val right = join.getRight.asInstanceOf[RelSubset].getOriginal + + + right match { +// right node is a table function +case scan: LogicalTableFunctionScan => true +// a filter is pushed above the table function +case filter: LogicalFilter => + filter.getInput.asInstanceOf[RelSubset].getOriginal +.isInstanceOf[LogicalTableFunctionScan] +case _ => false + } +} + +override def convert(rel: RelNode): RelNode = { + val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE) + val right: RelNode = join.getInput(1) + + def convertToCorrelate(relNode: RelNode, condition: RexNode): DataSetCorrelate = { --- End diff -- define `condition` as `Option[RexNode]` so we do not have to use `null` > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > coll
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671954#comment-15671954 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88346802 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala --- @@ -47,13 +52,50 @@ class FunctionCatalog { sqlFunctions += sqlFunction } + /** Register multiple sql functions at one time. The functions has the same name. **/ + def registerSqlFunctions(functions: Seq[SqlFunction]): Unit = { +if (functions.nonEmpty) { + sqlFunctions --= sqlFunctions.filter(_.getName == functions.head.getName) + sqlFunctions ++= functions +} + } + def getSqlOperatorTable: SqlOperatorTable = ChainedSqlOperatorTable.of( new BasicOperatorTable(), new ListSqlOperatorTable(sqlFunctions) ) /** +* Lookup table function and create an TableFunctionCall if we find a match. +*/ + def lookupTableFunction[T](name: String, children: Seq[Expression]): TableFunctionCall[T] = { +val funcClass = functionBuilders + .getOrElse(name.toLowerCase, throw ValidationException(s"Undefined function: $name")) +funcClass match { + // user-defined table function call + case tf if classOf[TableFunction[T]].isAssignableFrom(tf) => + Try(UserDefinedFunctionUtils.instantiate(tf.asInstanceOf[Class[TableFunction[T]]])) match { + case Success(tableFunction) => { +val clazz: Type = tableFunction.getClass.getGenericSuperclass +val generic = clazz match { + case cls: ParameterizedType => cls.getActualTypeArguments.toSeq.head + case _ => throw new TableException( +"New TableFunction classes need to inherit from TableFunction class," + + " and statement the generic type.") +} +implicit val typeInfo: TypeInformation[T] = TypeExtractor.createTypeInfo(generic) + .asInstanceOf[TypeInformation[T]] +TableFunctionCall(tableFunction, children, None) + } + case Failure(e) => throw ValidationException(e.getMessage) +} + case _ => +throw ValidationException("Unsupported table function.") --- End diff -- I think this exception message could be improved. It is throw if the registered method does not implement the `TableFunction` interface. > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(spli
[jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671960#comment-15671960 ] ASF GitHub Bot commented on FLINK-4469: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88348227 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.rules.datastream + +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan} +import org.apache.calcite.rex.RexNode +import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamCorrelate, DataStreamConvention} + +/** + * parser cross/outer apply --- End diff -- Replace by "Rule to convert a LogicalCorrelate into a DataStreamCorrelate.". > Add support for user defined table function in Table API & SQL > -- > > Key: FLINK-4469 > URL: https://issues.apache.org/jira/browse/FLINK-4469 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Normal user-defined functions, such as concat(), take in a single input row > and output a single output row. In contrast, table-generating functions > transform a single input row to multiple output rows. It is very useful in > some cases, such as look up in HBase by rowkey and return one or more rows. > Adding a user defined table function should: > 1. inherit from UDTF class with specific generic type T > 2. define one or more evel function. > NOTE: > 1. the eval method must be public and non-static. > 2. the generic type T is the row type returned by table function. Because of > Java type erasure, we can’t extract T from the Iterable. > 3. use {{collect(T)}} to emit table row > 4. eval method can be overload. Blink will choose the best match eval method > to call according to parameter types and number. > {code} > public class Word { > public String word; > public Integer length; > } > public class SplitStringUDTF extends UDTF { > public Iterable eval(String str) { > if (str != null) { > for (String s : str.split(",")) { > collect(new Word(s, s.length())); > } > } > } > } > // in SQL > tableEnv.registerFunction("split", new SplitStringUDTF()) > tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS > t(w,l)") > // in Java Table API > tableEnv.registerFunction("split", new SplitStringUDTF()) > // rename split table columns to “w” and “l” > table.crossApply("split(c) as (w, l)") > .select("a, b, w, l") > // without renaming, we will use the origin field names in the POJO/case/... > table.crossApply("split(c)") > .select("a, b, word, length") > // in Scala Table API > val split = new SplitStringUDTF() > table.crossApply(split('c) as ('w, 'l)) > .select('a, 'b, 'w, 'l) > // outerApply for outer join to a UDTF > table.outerApply(split('c)) > .select('a, 'b, 'word, 'length) > {code} > See [1] for more information about UDTF design. > [1] > https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit# -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88336697 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -162,24 +191,107 @@ object UserDefinedFunctionUtils { } /** +* Internal method of [[ScalarFunction#getResultType()]] that does some pre-checking and uses --- End diff -- If this method is also used for `TableFunction`, the docs should be adapted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88338243 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -611,6 +612,130 @@ class Table( } /** +* The Cross Apply returns rows form the outer table (table on the left of the Apply operator) +* that produces matching values from the table-valued function (which is on the right side of +* the operator). +* +* The Cross Apply is equivalent to Inner Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* class MySplitUDTF extends TableFunction[String] { +* def eval(str: String): Unit = { +* str.split("#").foreach(collect) +* } +* } +* +* val split = new MySplitUDTF() +* table.crossApply(split('c).as('s)).select('a,'b,'c,'s) +* }}} +*/ + def crossApply(udtf: TableFunctionCall[_]): Table = { +applyInternal(udtf, JoinType.INNER) + } + + /** +* The Cross Apply returns rows form the outer table (table on the left of the Apply operator) +* that produces matching values from the table-valued function (which is on the right side of +* the operator). +* +* The Cross Apply is equivalent to Inner Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* class MySplitUDTF extends TableFunction[String] { +* def eval(str: String): Unit = { +* str.split("#").foreach(collect) +* } +* } +* +* val split = new MySplitUDTF() +* table.crossApply("split('c') as (s)").select("a, b, c, s") +* }}} +*/ + def crossApply(udtf: String): Table = { +applyInternal(udtf, JoinType.INNER) + } + + /** +* The Cross Apply returns rows form the outer table (table on the left of the Apply operator) +* that produces matching values from the table-valued function (which is on the right side of +* the operator). +* +* The Cross Apply is equivalent to Inner Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* class MySplitUDTF extends TableFunction[String] { +* def eval(str: String): Unit = { +* str.split("#").foreach(collect) +* } +* } +* +* val split = new MySplitUDTF() +* table.outerApply(split('c).as('s)).select('a,'b,'c,'s) +* }}} +*/ + def outerApply(udtf: TableFunctionCall[_]): Table = { +applyInternal(udtf, JoinType.LEFT_OUTER) + } + + /** +* The Outer Apply returns all the rows from the outer table (table on the left of the Apply +* operator), and rows that do not matches the condition from the table-valued function (which +* is on the right side of the operator), NULL values are displayed. +* +* The Outer Apply is equivalent to Left Outer Join, but it works with a table-valued function. +* +* Example: +* +* {{{ +* val split = new MySplitUDTF() +* table.crossApply("split('c') as (s)").select("a, b, c, s") --- End diff -- `crossApply` should be `outerApply`. Please check the whole docs for this method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88336271 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.functions + +import org.apache.calcite.sql.SqlFunction +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.functions.InvalidTypesException +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory} + +import scala.collection.mutable.ListBuffer + +/** + * Base class for a user-defined table function (UDTF). A user-defined table functions works on + * one row as input and returns multiple rows as output. + * + * The behavior of a [[TableFunction]] can be defined by implementing a custom evaluation + * method. An evaluation method must be declared publicly and named "eval". Evaluation methods + * can also be overloaded by implementing multiple methods named "eval". + * + * User-defined functions must have a default constructor and must be instantiable during runtime. + * + * By default the result type of an evaluation method is determined by Flink's type extraction + * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more + * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type + * can be manually defined by overriding [[getResultType()]]. + * + * Internally, the Table/SQL API code generation works with primitive values as much as possible. + * If a user-defined table function should not introduce much overhead during runtime, it is + * recommended to declare parameters and result types as primitive types instead of their boxed + * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long. + * + * @tparam T The type of the output row + */ +abstract class TableFunction[T] extends UserDefinedFunction with EvaluableFunction { + + private val rows: ListBuffer[T] = new ListBuffer + + /** +* Emit an output row +* +* @param row the output row +*/ + protected def collect(row: T): Unit = { +// cache rows for now, maybe immediately process them further +rows += row + } + + + @Internal + def getRowsIterator = rows.toIterator + + @Internal + def clear() = rows.clear() + + // this method will not be called, because we need to register multiple sql function at one time + override private[flink] final def createSqlFunction( + name: String, + typeFactory: FlinkTypeFactory) +: SqlFunction = { +throw new UnsupportedOperationException("this method should not be called") + } + + // -- + + /** +* Returns the result type of the evaluation method with a given signature. +* +* This method needs to be overriden in case Flink's type extraction facilities are not +* sufficient to extract the [[TypeInformation]] based on the return type of the evaluation +* method. Flink's type extraction facilities can handle basic types or +* simple POJOs but might be wrong for more complex, custom, or composite types. +* +* @return [[TypeInformation]] of result type or null if Flink should determine the type +*/ + def getResultType: TypeInformation[T] = null + + /** +* Returns [[TypeInformation]] about the operands of the evaluation method with a given +* signature. +* +* In order to perform operand type inference in SQL (especially when NULL is used) it might be +
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r87304840 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -360,7 +370,8 @@ case class Join( left: LogicalNode, right: LogicalNode, joinType: JoinType, -condition: Option[Expression]) extends BinaryNode { +condition: Option[Expression], +corId: Option[CorrelationId] = None) extends BinaryNode { --- End diff -- CorrelationId is a Calcite class. At this point we do all validation ourselves. Can we replace it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r87303197 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala --- @@ -305,6 +305,16 @@ object ScalarFunctions { ) ) +// user-defined table function +case tsf: TableSqlFunction => --- End diff -- Should we rename this class (`ScalarFunctions`) because it contains also `TableFunction` logic? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88335748 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.functions + +import org.apache.calcite.sql.SqlFunction +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.functions.InvalidTypesException +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory} + +import scala.collection.mutable.ListBuffer + +/** + * Base class for a user-defined table function (UDTF). A user-defined table functions works on + * one row as input and returns multiple rows as output. + * + * The behavior of a [[TableFunction]] can be defined by implementing a custom evaluation + * method. An evaluation method must be declared publicly and named "eval". Evaluation methods + * can also be overloaded by implementing multiple methods named "eval". + * + * User-defined functions must have a default constructor and must be instantiable during runtime. + * + * By default the result type of an evaluation method is determined by Flink's type extraction + * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more + * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type + * can be manually defined by overriding [[getResultType()]]. + * + * Internally, the Table/SQL API code generation works with primitive values as much as possible. + * If a user-defined table function should not introduce much overhead during runtime, it is + * recommended to declare parameters and result types as primitive types instead of their boxed + * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long. + * + * @tparam T The type of the output row + */ +abstract class TableFunction[T] extends UserDefinedFunction with EvaluableFunction { + + private val rows: ListBuffer[T] = new ListBuffer + + /** +* Emit an output row +* +* @param row the output row +*/ + protected def collect(row: T): Unit = { +// cache rows for now, maybe immediately process them further +rows += row + } + + + @Internal --- End diff -- Remove `@Internal` annotation. Annotations are only used in specific Maven modules (flink-core, flink-java, flink-scala, ...) but not yet in flink-table --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2653: [FLINK-4469] [table] Add support for user defined ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2653#discussion_r88335539 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.functions + +import org.apache.calcite.sql.SqlFunction +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.functions.InvalidTypesException +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.table.{ValidationException, FlinkTypeFactory} + +import scala.collection.mutable.ListBuffer + +/** + * Base class for a user-defined table function (UDTF). A user-defined table functions works on + * one row as input and returns multiple rows as output. + * + * The behavior of a [[TableFunction]] can be defined by implementing a custom evaluation + * method. An evaluation method must be declared publicly and named "eval". Evaluation methods + * can also be overloaded by implementing multiple methods named "eval". --- End diff -- Maybe add a short code example for `eval()` here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5083) Race condition in Rolling/Bucketing Sink pending files cleanup
[ https://issues.apache.org/jira/browse/FLINK-5083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671880#comment-15671880 ] Kostas Kloudas commented on FLINK-5083: --- Thanks for reporting it! There is a pending pull request here: https://github.com/apache/flink/pull/2797 that removes the deleting all together. The reason is that deletion of lingering files does not play well with rescaling, which re-shuffles the different state of individual tasks. Given that this PR is about to be merged, I suppose that this issue will be resolved. In addition I also have another PR for the RollingSink ready to open as soon as the aforementioned one gets merged. > Race condition in Rolling/Bucketing Sink pending files cleanup > -- > > Key: FLINK-5083 > URL: https://issues.apache.org/jira/browse/FLINK-5083 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Cliff Resnick > > In both Open and Restore methods there is code that: > 1. gets a recursive listing from baseDir > 2. iterates listing and name checks filenames based on subtaskIndex and other > criteria to find pending or in-progress files. If found delete. > The problem is that the recursive listing gets all files for all > subtaskIndexes. The race error is when #hasNext is called as part of the > iteration, a hidden existence check is made on the "next" file, which was > deleted by another task after-listing but pre-iteration, so an error is > thrown and the job fails. > Depending on the number of pending files, this condition may outlast the > number of job retries, each failing on a different file. > A solution would be use #listStatus instead. The hadoop FileSystem supports a > PathFilter in its #listStatus calls, but not in the recursive #listFiles > call. The cleanup is performed from the baseDir so the recursive listing > would have to be in Flink. > This touches on another issue. Over time, the directory listing is bound to > get very large, and re-listing everything from the baseDir may get > increasingly expensive, especially if the Fs is S3. Maybe we can have a > Bucketer callback to return a list of cleanup root directories based on the > current file? I'm guessing most people are using time based bucketing, so > there's only so much of a period where cleanup will matter. If so, then this > would solve for the above recursive listing problem. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2740: [FLINK-4964] [ml]
Github user tfournier314 commented on the issue: https://github.com/apache/flink/pull/2740 @greghogan Ok I've pushed the code with my tests and some modifications in mapping @thvasilo It seems to work perfectly! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4964) FlinkML - Add StringIndexer
[ https://issues.apache.org/jira/browse/FLINK-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671741#comment-15671741 ] ASF GitHub Bot commented on FLINK-4964: --- Github user tfournier314 commented on the issue: https://github.com/apache/flink/pull/2740 @greghogan Ok I've pushed the code with my tests and some modifications in mapping @thvasilo It seems to work perfectly! > FlinkML - Add StringIndexer > --- > > Key: FLINK-4964 > URL: https://issues.apache.org/jira/browse/FLINK-4964 > Project: Flink > Issue Type: New Feature >Reporter: Thomas FOURNIER >Priority: Minor > > Add StringIndexer as described here: > http://spark.apache.org/docs/latest/ml-features.html#stringindexer > This will be added in package preprocessing of FlinkML -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2804: [FLINK-5067] Make Flink compile with 1.8 Java comp...
Github user melentye commented on a diff in the pull request: https://github.com/apache/flink/pull/2804#discussion_r88332568 --- Diff: pom.xml --- @@ -96,7 +96,7 @@ under the License. 1.7.7 18.0 2.3.7 - 1.7 + 1.7 --- End diff -- I actually prefer java.version for it's conciseness and I share the conservative tune in regards to unnecessary pom.xml changes. But in this case I did it on purpose, long explanation follows. The java.version is passed by -Djava.version=1.8 argument to mvn which seems to be Maven-recommended (or even the only one supported) way of redefining the property from command-line. An inconvenient side-effect is that this property is then propagated down until the JVM that executes the tests and causes java.lang.RuntimeException: Unexpected version format: 1.8 at org.apache.hadoop.hbase.util.ClassSize.(ClassSize.java:119) starting in org.apache.flink.addons.hbase.TableInputFormatITCase (won't bore you with the complete stack trace). Since this is third party code, we can't really fix it in Flink source. Therefore I thought to rename the property instead to avoid the clash. There's a way to keep the java.version property name as is: we can use a maven profile to set java.version. In this case when you activate the profile with -P argument to mvn, the java.version will not be propagated as a system property to the JVM running the tests and won't cause problems. I've noticed that there's an existing profile called "jdk8" which sounds relevant. But it's currently automatically activated if you're running Maven with Java 8 so that could turn out to be a surprise for the maintainers to realize that they suddenly are building 1.8 target bytecode. The profile doesn't do much though so I am questioning if it really needs to be auto-activated. I think it'll be cleaner to stop auto-activating it and reuse it for the above purpose. See proposal in the PR update. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5067) Make Flink compile with 1.8 Java compiler
[ https://issues.apache.org/jira/browse/FLINK-5067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671666#comment-15671666 ] ASF GitHub Bot commented on FLINK-5067: --- Github user melentye commented on a diff in the pull request: https://github.com/apache/flink/pull/2804#discussion_r88332568 --- Diff: pom.xml --- @@ -96,7 +96,7 @@ under the License. 1.7.7 18.0 2.3.7 - 1.7 + 1.7 --- End diff -- I actually prefer java.version for it's conciseness and I share the conservative tune in regards to unnecessary pom.xml changes. But in this case I did it on purpose, long explanation follows. The java.version is passed by -Djava.version=1.8 argument to mvn which seems to be Maven-recommended (or even the only one supported) way of redefining the property from command-line. An inconvenient side-effect is that this property is then propagated down until the JVM that executes the tests and causes java.lang.RuntimeException: Unexpected version format: 1.8 at org.apache.hadoop.hbase.util.ClassSize.(ClassSize.java:119) starting in org.apache.flink.addons.hbase.TableInputFormatITCase (won't bore you with the complete stack trace). Since this is third party code, we can't really fix it in Flink source. Therefore I thought to rename the property instead to avoid the clash. There's a way to keep the java.version property name as is: we can use a maven profile to set java.version. In this case when you activate the profile with -P argument to mvn, the java.version will not be propagated as a system property to the JVM running the tests and won't cause problems. I've noticed that there's an existing profile called "jdk8" which sounds relevant. But it's currently automatically activated if you're running Maven with Java 8 so that could turn out to be a surprise for the maintainers to realize that they suddenly are building 1.8 target bytecode. The profile doesn't do much though so I am questioning if it really needs to be auto-activated. I think it'll be cleaner to stop auto-activating it and reuse it for the above purpose. See proposal in the PR update. > Make Flink compile with 1.8 Java compiler > - > > Key: FLINK-5067 > URL: https://issues.apache.org/jira/browse/FLINK-5067 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.2.0 > Environment: macOS Sierra 10.12.1, java version "1.8.0_112", Apache > Maven 3.3.9 >Reporter: Andrey Melentyev >Priority: Minor > > Flink fails to compile when using 1.8 as source and target in Maven. There > are two types of issue that are both related to the new type inference rules: > * Call to TypeSerializer.copy method in TupleSerializer.java:112 now resolves > to a different overload than before causing a compilation error: [ERROR] > /Users/andrey.melentyev/Dev/github.com/apache/flink/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java:[112,63] > incompatible types: void cannot be converted to java.lang.Object > * A number of unit tests using assertEquals fail to compile: > [ERROR] > /Users/andrey.melentyev/Dev/github.com/apache/flink/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java:[50,25] > reference to assertEquals is ambiguous > [ERROR] both method assertEquals(long,long) in org.junit.Assert and method > assertEquals(java.lang.Object,java.lang.Object) in org.junit.Assert match > In both of the above scenarios explicitly casting one of the arguments helps > the compiler to resolve overloaded method call correctly. > It is possible to maintain Flink's code base in a state when it can be built > by both 1.7 and 1.8. For this purpose we need minor code fixes and an > automated build in Travis to keep the new good state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)