[jira] [Commented] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
[ https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106034#comment-16106034 ] Aljoscha Krettek commented on FLINK-6996: - [~till.rohrmann] The log is not accessible. (at least for me) > FlinkKafkaProducer010 doesn't guarantee at-least-once semantic > -- > > Key: FLINK-6996 > URL: https://issues.apache.org/jira/browse/FLINK-6996 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0, 1.3.2 > > > FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This > means, when it's used like a "regular sink function" (option a from [the java > doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html]) > it will not flush the data on "snapshotState" as it is supposed to. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106008#comment-16106008 ] Yueting Chen commented on FLINK-7169: - Hi [~dawidwys], About the empty match issue, I am not sure if I get your point, could you show me an example if possible? > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7292) Fix EMPTY MATCH bug in CEP.
[ https://issues.apache.org/jira/browse/FLINK-7292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106007#comment-16106007 ] Yueting Chen commented on FLINK-7292: - Hi [~litrain], Thanks for bringing it up. But I don't think the result should contain an empty match in your example. In general, {{a?}} means we should try to find a single match for {{a}} first, if it fails, then an empty result should be generated. I think for the match pattern {{a?}} and the input events {{b1,a1}} , the result should be {{\[empty match],a1}}. IMO, the only meaning that empty result exists is that it indicates how many match times was processed. Maybe in some use cases it is useful. > Fix EMPTY MATCH bug in CEP. > --- > > Key: FLINK-7292 > URL: https://issues.apache.org/jira/browse/FLINK-7292 > Project: Flink > Issue Type: New Feature > Components: CEP >Reporter: zhangxiaoyu > > Currently, with the pattern {quote}a? {quote}and the event{quote} a1{quote}, > the result pattern is only{quote} a1{quote}without the empty match. > We wish the empty matched is also returned. And I am working on this issue > now. > My method is checking if there exists empty match only when the the first > event comes(at the StartState) ——try to traverse the PROCEED edges with the > trueFunction condition from the StartState, see if it can arrive FinalState, > if so, add an empty list to the result. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7293) Support custom order by in PatternStream
[ https://issues.apache.org/jira/browse/FLINK-7293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105963#comment-16105963 ] Dian Fu edited comment on FLINK-7293 at 7/29/17 1:59 AM: - {quote} Could you explain a bit why this is needed? {quote} As we need to support clauses such as {code} SELECT * FROM Ticker MATCH_RECOGNIZE ( PARTITION BY symbol ORDER BY tstamp, price MEASURES STRT.tstamp AS start_tstamp, LAST(DOWN.tstamp) AS bottom_tstamp, LAST(UP.tstamp) AS end_tstamp ONE ROW PER MATCH AFTER MATCH SKIP TO LAST UP PATTERN (STRT DOWN+ UP+) DEFINE DOWN AS DOWN.price < PREV(DOWN.price), UP AS UP.price > PREV(UP.price) ) MR {code} There may be multiple columns {{tstamp}} and {{price}} to {{order by}}. {quote} I can't see a way to sort an unbounded stream of data Could you elaborate a bit how do you see it working? how this is going to play well with the Time semantics. When both event-time and a custom order-by is used, who is going to win? {quote} This is working in the same way as the implementation of {{sort by}} in table API. That's to say, both the event-time and the custom order-by will be used and the event-time will be considered with higher priority and the custom order-by will be considered with lower priority. With both event-time and a custom order-by are used, when events come, they will be firstly ordered by the event time and when watermark come, the events with the same event time before watermark will be firstly ordered by the custom order-by before emitted (Please refer to [DataStreamSort.scala|https://github.com/apache/flink/blob/b8c8f204de718e6d5b7c3df837deafaed7c375f5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala] for more details) Thoughts? was (Author: dian.fu): {quote} Could you explain a bit why this is needed? {quote} As we need to support clauses such as {code} SELECT * FROM Ticker MATCH_RECOGNIZE ( PARTITION BY symbol ORDER BY tstamp, price MEASURES STRT.tstamp AS start_tstamp, LAST(DOWN.tstamp) AS bottom_tstamp, LAST(UP.tstamp) AS end_tstamp ONE ROW PER MATCH AFTER MATCH SKIP TO LAST UP PATTERN (STRT DOWN+ UP+) DEFINE DOWN AS DOWN.price < PREV(DOWN.price), UP AS UP.price > PREV(UP.price) ) MR {code} There may be multiple columns to order by. {quote} I can't see a way to sort an unbounded stream of data Could you elaborate a bit how do you see it working? how this is going to play well with the Time semantics. When both event-time and a custom order-by is used, who is going to win? {quote} This is working in the same way as the implementation of {{sort by}} in table API. That's to say, both the event-time and the custom order-by will be used and the event-time should be considered with higher priority and the custom order-by will be considered with lower priorities. With both event-time and a custom order-by are used, when events come, they will be firstly ordered by the event time and when watermark come, the events before watermark with the same event time will firstly ordered by the custom order-by before emitted (Please refer to [DataStreamSort.scala|https://github.com/apache/flink/blob/b8c8f204de718e6d5b7c3df837deafaed7c375f5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala] for more details) Thoughts? > Support custom order by in PatternStream > > > Key: FLINK-7293 > URL: https://issues.apache.org/jira/browse/FLINK-7293 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > Currently, when {{ProcessingTime}} is configured, the events are fed to NFA > in the order of the arriving time and when {{EventTime}} is configured, the > events are fed to NFA in the order of the event time. It should also allow > custom {{order by}} to allow users to define the order of the events besides > the above factors. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7293) Support custom order by in PatternStream
[ https://issues.apache.org/jira/browse/FLINK-7293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105963#comment-16105963 ] Dian Fu edited comment on FLINK-7293 at 7/29/17 1:30 AM: - {quote} Could you explain a bit why this is needed? {quote} As we need to support clauses such as {code} SELECT * FROM Ticker MATCH_RECOGNIZE ( PARTITION BY symbol ORDER BY tstamp, price MEASURES STRT.tstamp AS start_tstamp, LAST(DOWN.tstamp) AS bottom_tstamp, LAST(UP.tstamp) AS end_tstamp ONE ROW PER MATCH AFTER MATCH SKIP TO LAST UP PATTERN (STRT DOWN+ UP+) DEFINE DOWN AS DOWN.price < PREV(DOWN.price), UP AS UP.price > PREV(UP.price) ) MR {code} There may be multiple columns to order by. {quote} I can't see a way to sort an unbounded stream of data Could you elaborate a bit how do you see it working? how this is going to play well with the Time semantics. When both event-time and a custom order-by is used, who is going to win? {quote} This is working in the same way as the implementation of {{sort by}} in table API. That's to say, both the event-time and the custom order-by will be used and the event-time should be considered with higher priority and the custom order-by will be considered with lower priorities. With both event-time and a custom order-by are used, when events come, they will be firstly ordered by the event time and when watermark come, the events before watermark with the same event time will firstly ordered by the custom order-by before emitted (Please refer to [DataStreamSort.scala|https://github.com/apache/flink/blob/b8c8f204de718e6d5b7c3df837deafaed7c375f5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala] for more details) Thoughts? was (Author: dian.fu): {quote} Could you explain a bit why this is needed? {quote} As we need to support clauses such as {code} SELECT * FROM Ticker MATCH_RECOGNIZE ( PARTITION BY symbol ORDER BY tstamp, price MEASURES STRT.tstamp AS start_tstamp, LAST(DOWN.tstamp) AS bottom_tstamp, LAST(UP.tstamp) AS end_tstamp ONE ROW PER MATCH AFTER MATCH SKIP TO LAST UP PATTERN (STRT DOWN+ UP+) DEFINE DOWN AS DOWN.price < PREV(DOWN.price), UP AS UP.price > PREV(UP.price) ) MR {code} There may be multiple columns to order by. {quote} I can't see a way to sort an unbounded stream of data Could you elaborate a bit how do you see it working? how this is going to play well with the Time semantics. When both event-time and a custom order-by is used, who is going to win? {quote} This is working in the same way as the implementation of {{sort by}} in table API. That's to say, both the event-time and the custom order-by will be used and the event-time should be considered with higher priority and the custom order-by will be considered with lower priorities. (Please refer to [DataStreamSort.scala|https://github.com/apache/flink/blob/b8c8f204de718e6d5b7c3df837deafaed7c375f5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala] for more details) Thoughts? > Support custom order by in PatternStream > > > Key: FLINK-7293 > URL: https://issues.apache.org/jira/browse/FLINK-7293 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > Currently, when {{ProcessingTime}} is configured, the events are fed to NFA > in the order of the arriving time and when {{EventTime}} is configured, the > events are fed to NFA in the order of the event time. It should also allow > custom {{order by}} to allow users to define the order of the events besides > the above factors. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7293) Support custom order by in PatternStream
[ https://issues.apache.org/jira/browse/FLINK-7293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105963#comment-16105963 ] Dian Fu commented on FLINK-7293: {quote} Could you explain a bit why this is needed? {quote} As we need to support clauses such as {code} SELECT * FROM Ticker MATCH_RECOGNIZE ( PARTITION BY symbol ORDER BY tstamp, price MEASURES STRT.tstamp AS start_tstamp, LAST(DOWN.tstamp) AS bottom_tstamp, LAST(UP.tstamp) AS end_tstamp ONE ROW PER MATCH AFTER MATCH SKIP TO LAST UP PATTERN (STRT DOWN+ UP+) DEFINE DOWN AS DOWN.price < PREV(DOWN.price), UP AS UP.price > PREV(UP.price) ) MR {code} There may be multiple columns to order by. {quote} I can't see a way to sort an unbounded stream of data Could you elaborate a bit how do you see it working? how this is going to play well with the Time semantics. When both event-time and a custom order-by is used, who is going to win? {quote} This is working in the same way as the implementation of {{sort by}} in table API. That's to say, both the event-time and the custom order-by will be used and the event-time should be considered with higher priority and the custom order-by will be considered with lower priorities. (Please refer to [DataStreamSort.scala|https://github.com/apache/flink/blob/b8c8f204de718e6d5b7c3df837deafaed7c375f5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala] for more details) Thoughts? > Support custom order by in PatternStream > > > Key: FLINK-7293 > URL: https://issues.apache.org/jira/browse/FLINK-7293 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > Currently, when {{ProcessingTime}} is configured, the events are fed to NFA > in the order of the arriving time and when {{EventTime}} is configured, the > events are fed to NFA in the order of the event time. It should also allow > custom {{order by}} to allow users to define the order of the events besides > the above factors. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7281) Fix various issues in (Maven) release infrastructure
[ https://issues.apache.org/jira/browse/FLINK-7281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-7281. --- Resolution: Fixed Implemented on master in 50a818b1bb74e9442478157bb10c0d5de05ad665 f1c92f2367183d7e215466b3ff227fff11efc470 Implemented on release-1.3 in 62b67876cec0ef13e35eec242a01029899b916b4 b5c9617b41eb38c104ea5c20f15a1f937c591b40 > Fix various issues in (Maven) release infrastructure > > > Key: FLINK-7281 > URL: https://issues.apache.org/jira/browse/FLINK-7281 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.3.1, 1.4.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0, 1.3.2 > > > I discovered couple of issues while getting ready for release 1.3.2: > * some old, misleading release scripts and release README > * the _maven-release-plugin_ is not correctly configured for doing actual > releases with it > * the quickstarts are not configured to depend on the project version and > thus require manual updating, also of slf4j and log4j versions > * the _maven-javadoc-plugin_ configuration does not work when using the the > _maven-release-plugin_, that is we have to move the config to the plugin > section and out of the _release_ profile -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7290) Make release scripts modular
[ https://issues.apache.org/jira/browse/FLINK-7290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-7290. --- Resolution: Fixed Implemented on master in 18733d82e694997e229afb50f13c21fcc1c65729 Implemented on release-1.3 in 084c59e0ec7b0800d2612b23e702a9064fe66aac > Make release scripts modular > > > Key: FLINK-7290 > URL: https://issues.apache.org/jira/browse/FLINK-7290 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.3.0, 1.3.1, 1.4.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.4.0, 1.3.2 > > > The current release script _create_release_files.sh_ is one monolithic script > that creates a release branch, changes versions in POMs and documentation, > creates a release commit (but not a release tag), creates a source release, > pushes Scala 2.10 and Scala 2.11 artefacts to maven, creates binary > convenience releases for various Hadoop and Scala versions, stages the source > and binary releases for release voting. > If anything goes wrong in the release process modification (or complete > start-over) of the process is required. I'm proposing to create a set of > modular release scripts that each perform a given action. (Actually, I would > like to use the _maven-release-plugin_ for that but this would require more > work and doesn't work well for releasing with different Scala versions). > I'm proposing this set of scripts: > * _create_release_branch.sh_: Branch of for new release, update versions in > POMs and doc, create release tag. > * _create_source_release.sh_: Selfexplanatory > * _deploy_stating_jars.sh_: Selfexplanatory > * _create_binary_release.sh_: Create a binary release for a specific version > or for a whole matrix of versions. > Also, having the modular scripts allows, for example, to create the binary > releases (which is time consuming) on a VM somewhere, then fetching them to > the local machine and sign them there. I.e. this doesn't require putting a > private key and passphrase on a remote machine. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7281) Fix various issues in (Maven) release infrastructure
[ https://issues.apache.org/jira/browse/FLINK-7281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105552#comment-16105552 ] ASF GitHub Bot commented on FLINK-7281: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4407 @zentol Yes, I'm planning to remove the old script and the documentation for the new scripts is taking shape here: https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release The scripts are not yet described there but I will fill this in as I go. I think it's already quite the improvement compared to the old release "guide": https://cwiki.apache.org/confluence/display/FLINK/Releasing > Fix various issues in (Maven) release infrastructure > > > Key: FLINK-7281 > URL: https://issues.apache.org/jira/browse/FLINK-7281 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.3.1, 1.4.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0, 1.3.2 > > > I discovered couple of issues while getting ready for release 1.3.2: > * some old, misleading release scripts and release README > * the _maven-release-plugin_ is not correctly configured for doing actual > releases with it > * the quickstarts are not configured to depend on the project version and > thus require manual updating, also of slf4j and log4j versions > * the _maven-javadoc-plugin_ configuration does not work when using the the > _maven-release-plugin_, that is we have to move the config to the plugin > section and out of the _release_ profile -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4407: [FLINK-7281] Fix various issues in (Maven) release infras...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4407 @zentol Yes, I'm planning to remove the old script and the documentation for the new scripts is taking shape here: https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release The scripts are not yet described there but I will fill this in as I go. I think it's already quite the improvement compared to the old release "guide": https://cwiki.apache.org/confluence/display/FLINK/Releasing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7281) Fix various issues in (Maven) release infrastructure
[ https://issues.apache.org/jira/browse/FLINK-7281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105550#comment-16105550 ] ASF GitHub Bot commented on FLINK-7281: --- Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/4407 > Fix various issues in (Maven) release infrastructure > > > Key: FLINK-7281 > URL: https://issues.apache.org/jira/browse/FLINK-7281 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.3.1, 1.4.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0, 1.3.2 > > > I discovered couple of issues while getting ready for release 1.3.2: > * some old, misleading release scripts and release README > * the _maven-release-plugin_ is not correctly configured for doing actual > releases with it > * the quickstarts are not configured to depend on the project version and > thus require manual updating, also of slf4j and log4j versions > * the _maven-javadoc-plugin_ configuration does not work when using the the > _maven-release-plugin_, that is we have to move the config to the plugin > section and out of the _release_ profile -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7281) Fix various issues in (Maven) release infrastructure
[ https://issues.apache.org/jira/browse/FLINK-7281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105549#comment-16105549 ] ASF GitHub Bot commented on FLINK-7281: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4407 Merged > Fix various issues in (Maven) release infrastructure > > > Key: FLINK-7281 > URL: https://issues.apache.org/jira/browse/FLINK-7281 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.3.1, 1.4.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0, 1.3.2 > > > I discovered couple of issues while getting ready for release 1.3.2: > * some old, misleading release scripts and release README > * the _maven-release-plugin_ is not correctly configured for doing actual > releases with it > * the quickstarts are not configured to depend on the project version and > thus require manual updating, also of slf4j and log4j versions > * the _maven-javadoc-plugin_ configuration does not work when using the the > _maven-release-plugin_, that is we have to move the config to the plugin > section and out of the _release_ profile -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4407: [FLINK-7281] Fix various issues in (Maven) release...
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/4407 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4407: [FLINK-7281] Fix various issues in (Maven) release infras...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4407 Merged --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4400: [FLINK-7253] [tests] Remove CommonTestUtils#assumeJava8
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4400 Changes look good but unused imports checkstyle violation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7253) Remove all 'assume Java 8' code in tests
[ https://issues.apache.org/jira/browse/FLINK-7253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105545#comment-16105545 ] ASF GitHub Bot commented on FLINK-7253: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4400 Changes look good but unused imports checkstyle violation. > Remove all 'assume Java 8' code in tests > > > Key: FLINK-7253 > URL: https://issues.apache.org/jira/browse/FLINK-7253 > Project: Flink > Issue Type: Sub-task > Components: Build System >Reporter: Stephan Ewen >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7250) Drop the jdk8 build profile
[ https://issues.apache.org/jira/browse/FLINK-7250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105543#comment-16105543 ] ASF GitHub Bot commented on FLINK-7250: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4399 LGTM > Drop the jdk8 build profile > --- > > Key: FLINK-7250 > URL: https://issues.apache.org/jira/browse/FLINK-7250 > Project: Flink > Issue Type: Sub-task > Components: Build System >Reporter: Stephan Ewen >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4399: [FLINK-7250] [build] Remove jdk8 profile
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4399 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7249) Bump Java version in build plugin
[ https://issues.apache.org/jira/browse/FLINK-7249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105542#comment-16105542 ] ASF GitHub Bot commented on FLINK-7249: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4398 +1 to merge We can also simplify setting the garbage collector in `flink-dist/src/main/flink-bin/bin/taskmanager.sh` and including the `flink-connector-elasticsearch5` module in `flink-connectors/pom.xml`. > Bump Java version in build plugin > - > > Key: FLINK-7249 > URL: https://issues.apache.org/jira/browse/FLINK-7249 > Project: Flink > Issue Type: Sub-task > Components: Build System >Reporter: Stephan Ewen > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4398: [FLINK-7249] [build] Bump java.version property to 1.8
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4398 +1 to merge We can also simplify setting the garbage collector in `flink-dist/src/main/flink-bin/bin/taskmanager.sh` and including the `flink-connector-elasticsearch5` module in `flink-connectors/pom.xml`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7092) Shutdown ResourceManager components properly
[ https://issues.apache.org/jira/browse/FLINK-7092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105224#comment-16105224 ] ASF GitHub Bot commented on FLINK-7092: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4289 > Shutdown ResourceManager components properly > > > Key: FLINK-7092 > URL: https://issues.apache.org/jira/browse/FLINK-7092 > Project: Flink > Issue Type: Sub-task > Components: Mesos >Reporter: Till Rohrmann >Assignee: mingleizhang >Priority: Minor > Labels: flip-6 > Fix For: 1.4.0 > > > The {{MesosResourceManager}} starts internally a {{TaskMonitor}}, > {{LaunchCoordinator}}, {{ConnectionMonitor}} and a > {{ReconciliationCoordinator}}. These components have to be properly shut down > when the {{MesosResourceManager}} closes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7279) MiniCluster can deadlock at shut down
[ https://issues.apache.org/jira/browse/FLINK-7279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-7279: Assignee: Nico Kruber > MiniCluster can deadlock at shut down > - > > Key: FLINK-7279 > URL: https://issues.apache.org/jira/browse/FLINK-7279 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Nico Kruber > Labels: flip-6 > > The {{MiniCluster}} can deadlock in case if the fatal error handler is called > while the {{MiniCluster}} shuts down. The reason is that the shut down > happens under a lock which is required by the fatal error handler as well. If > now the {{MiniCluster}} tries to shut down the underlying RPC service which > waits for all actors to terminate, it will never complete because one actor > is still waiting for the lock. > One solution would be to ignore the fatal error handler calls if the > {{MiniCluster}} is shutting down. > https://s3.amazonaws.com/archive.travis-ci.org/jobs/257811319/log.txt -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130053061 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. --- End diff -- ``` ListView provides List functionality for accumulators used by user-defined aggregate functions {{AggregateFunction}}. A ListView can be backed by a Java ArrayList or a state backend, depending on the context in which the function is used. At runtime `ListView` will be replaced by a {@link StateListView} or a {@link HeapListView}. Hence, the `ListView's` method do not need to be implemented. ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105182#comment-16105182 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130038499 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * --- End diff -- ScalaDocs are not formatted with HTML: http://docs.scala-lang.org/style/scaladoc.html > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130119984 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala --- @@ -19,13 +19,20 @@ package org.apache.flink.table.runtime.aggregate import org.apache.flink.api.common.functions.Function +import org.apache.flink.table.dataview.{DataViewFactory, HeapViewFactory} import org.apache.flink.types.Row /** * Base class for code-generated aggregations. */ abstract class GeneratedAggregations extends Function { + var factory: DataViewFactory = new HeapViewFactory() + + def getDataViewFactory: DataViewFactory = factory + + def setDataViewFactory(factory: DataViewFactory): Unit = this.factory = factory --- End diff -- I think we should rather add a method `initialize(ctx: RuntimeContext)` and generate code to register the state in this method. IMO, the `DataViewFactory` is also not required, because 1. we can code-gen all of that functionality 2. we can make heap the default for `MapView` and `ListView` such that we only need to replace it if it needs to be backed by state. So there would be only one implementation of `DataViewFactory` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105196#comment-16105196 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130071629 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[HeapListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the lost, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +@Internal +class ListViewSerializer[T](listSerializer: ListSerializer[T]) + extends TypeSerializer[ListView[T]] { + + override def isImmutableType: Boolean = listSerializer.isImmutableType + + override def duplicate(): TypeSerializer[ListView[T]] = { +new ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]]) + } + + override def createInstance(): ListView[T] = new HeapListView[T](listSerializer.createInstance()) + + override def copy(from: ListView[T]): ListView[T] = { +val list = from.asInstanceOf[HeapListView[T]].list +new HeapListView[T](listSerializer.copy(list)) + } + + override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = copy(from) + + override def getLength: Int = -1 + + override def serialize(record: ListView[T], target: DataOutputView): Unit = { +val list = record.asInstanceOf[HeapListView[T]].list +listSerializer.serialize(list, target) + } + + override def deserialize(source: DataInputView): ListView[T] = +new HeapListView[T](listSerializer.deserialize(source)) + + override def deserialize(reuse: ListView[T], source: DataInputView): ListView[T] = +deserialize(source) + + override def copy(source: DataInputView, target: DataOutputView): Unit = +listSerializer.copy(source, target) + + override def canEqual(obj: scala.Any): Boolean = obj != null && obj.getClass == getClass + + override def hashCode(): Int = listSerializer.hashCode() + + override def equals(obj: Any): Boolean = canEqual(this) && +listSerializer.equals(obj.asInstanceOf[ListSerializer[_]]) --- End diff -- this should be `obj.asInstanceOf[ListViewSerializer[_]].listSerializer` > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105185#comment-16105185 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130057272 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.MapViewTypeInfoFactory + +/** + * MapView encapsulates the operation of map. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java HashMap or state backend. It will be replaced by a {@link StateMapView} or a + * {@link HeapMapView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public MapView map; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + *@Override + *public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.map = new MapView<>(Types.STRING, Types.INT); + * accum.count = 0L; + * return accum; + *} + * + *//Overloaded accumulate method + *public void accumulate(MyAccum accumulator, String id) { + * try { + * if (!accumulator.map.contains(id)) { + *accumulator.map.put(id, 1); + *accumulator.count++; + * } + * } catch (Exception e) { + *e.printStackTrace(); + * } + *} + * + *@Override + *public Long getValue(MyAccum accumulator) { + * return accumulator.count; + *} + * } + * + * }}} + * + * @param keyTypeInfo key type information + * @param valueTypeInfo value type information + * @tparam K key type + * @tparam V value type + */ +@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]]) +class MapView[K, V]( --- End diff -- Most comments on `ListState` apply here as well. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130092445 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -307,6 +312,119 @@ object UserDefinedFunctionUtils { // -- /** +* get data view type information from accumulator constructor. +* +* @param aggFun aggregate function +* @return the data view specification +*/ + def getDataViewTypeInfoFromConstructor( +aggFun: AggregateFunction[_, _]) + : mutable.HashMap[String, TypeInformation[_]] = { + +val resultMap = new mutable.HashMap[String, TypeInformation[_]] +val acc = aggFun.createAccumulator() +val fields: util.List[Field] = TypeExtractor.getAllDeclaredFields(acc.getClass, true) +for (i <- 0 until fields.size()) { + val field = fields.get(i) + field.setAccessible(true) + if (classOf[DataView].isAssignableFrom(field.getType)) { +if (field.getType == classOf[MapView[_, _]]) { + val mapView = field.get(acc) + val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, "keyTypeInfo") + val valueTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, "valueTypeInfo") + if (keyTypeInfo != null && valueTypeInfo != null) { + resultMap.put(field.getName, new MapViewTypeInfo( +keyTypeInfo.asInstanceOf[TypeInformation[_]], +valueTypeInfo.asInstanceOf[TypeInformation[_]])) + } +} else if (field.getType == classOf[ListView[_]]) { + val listView = field.get(acc) + val elementTypeInfo = getFieldValue(classOf[ListView[_]], listView, "elementTypeInfo") + if (elementTypeInfo != null) { +resultMap.put(field.getName, + new ListViewTypeInfo(elementTypeInfo.asInstanceOf[TypeInformation[_]])) + } +} + } +} + +resultMap + } + + + /** +* Extract data view specification. +* +* @param index aggregate function index +* @param aggFun aggregate function +* @param accType accumulator type information +* @param dataViewTypes data view fields types +* @param isUseState is use state --- End diff -- The comment is not helpful. Please add more detailed parameters descriptions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130059458 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/HeapViewFactory.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.state.{ListStateDescriptor, MapStateDescriptor, StateDescriptor} +import org.apache.flink.table.api.dataview.{ListView, MapView} + +/** + * Heap view factory to create [[HeapListView]] or [[HeapMapView]]. + * + */ +class HeapViewFactory() extends DataViewFactory() { + + override protected def createListView[T](id: String): ListView[T] = new HeapListView[T] + + override protected def createMapView[K, V](id: String): MapView[K, V] = new HeapMapView[K, V] +} + +class HeapListView[T] extends ListView[T] { --- End diff -- Could this be the default implementation of `ListView`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130059497 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/HeapViewFactory.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.state.{ListStateDescriptor, MapStateDescriptor, StateDescriptor} +import org.apache.flink.table.api.dataview.{ListView, MapView} + +/** + * Heap view factory to create [[HeapListView]] or [[HeapMapView]]. + * + */ +class HeapViewFactory() extends DataViewFactory() { + + override protected def createListView[T](id: String): ListView[T] = new HeapListView[T] + + override protected def createMapView[K, V](id: String): MapView[K, V] = new HeapMapView[K, V] +} + +class HeapListView[T] extends ListView[T] { + + val list = new util.ArrayList[T]() + + def this(t: util.List[T]) = { +this() +list.addAll(t) + } + + override def get: JIterable[T] = { +if (!list.isEmpty) { + list +} else { + null +} + } + + override def add(value: T): Unit = list.add(value) + + override def clear(): Unit = list.clear() +} + +class HeapMapView[K, V] extends MapView[K, V] { --- End diff -- Could this be the default implementation of `MapView`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7297) Instable Kafka09ProducerITCase.testCustomPartitioning test case
Till Rohrmann created FLINK-7297: Summary: Instable Kafka09ProducerITCase.testCustomPartitioning test case Key: FLINK-7297 URL: https://issues.apache.org/jira/browse/FLINK-7297 Project: Flink Issue Type: Bug Components: Kafka Connector, Tests Affects Versions: 1.4.0 Reporter: Till Rohrmann Priority: Critical There seems to be a test instability of {{Kafka09ProducerITCase>KafkaProducerTestBase.testCustomPartitioning}} on Travis. https://s3.amazonaws.com/archive.travis-ci.org/jobs/258538636/log.txt -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105186#comment-16105186 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130040193 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * //Overloaded accumulate method + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T](val elementTypeInfo: TypeInformation[T]) extends DataView { + + def this() = this(null) + + /** +* Returns an iterable of the list. +* +* @return The iterable of the list or { @code null} if the list is empty. +*/ + def get: JIterable[T] = throw new UnsupportedOperationException("Unsupported operation!") --- End diff -- is this Java's `java.lang.UnsupportedOperationException`? > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105206#comment-16105206 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130117980 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -108,30 +112,38 @@ object AggregateUtil { outputArity, needRetract = false, needMerge = false, - needReset = false + needReset = false, + accConfig = Some(DataViewConfig(accSpecs, isUseState)) ) +val accConfig = accSpecs + .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) + .toMap[String, StateDescriptor[_, _]] + if (isRowTimeType) { if (isRowsClause) { // ROWS unbounded over process function new RowTimeUnboundedRowsOver( genFunction, aggregationStateType, CRowTypeInfo(inputTypeInfo), - queryConfig) + queryConfig, + accConfig) --- End diff -- I think it would be better to keep `accConfig` out of the ProcessFunctions. It is just passing information to the `GeneratedAggregations` which could also be code generated. The only thing that we need is a `RuntimeContext` in `GeneratedAggregations`. Therefore, I propose to add a method `GeneratedAggregations.initialize(ctx: RuntimeContext())` instead of adding `GeneratedAggregations.setDataViewFactory()`. In `initialize()` we can generate code that registers all necessary state by itself and keeps it as member variables. I think this would be cleaner because it encapsulates everything that's related to aggregation functions in the code-gen'd class. If we use heap state in `MapView` and `ListView` as default, we also won't need `DataViewFactory` because we can generate all state access directly (if required). > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130058705 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/DataViewFactory.scala --- @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.table.api.dataview.{ListView, MapView} + +/** + * Factory to creaate [[ListView]] or [[MapView]]. + * + */ +abstract class DataViewFactory() extends Serializable { --- End diff -- Would we need this if we only need to replace the view if it is backed by a state backend? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105213#comment-16105213 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130073596 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[HeapListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the lost, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +@Internal +class ListViewSerializer[T](listSerializer: ListSerializer[T]) + extends TypeSerializer[ListView[T]] { + + override def isImmutableType: Boolean = listSerializer.isImmutableType + + override def duplicate(): TypeSerializer[ListView[T]] = { +new ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]]) + } + + override def createInstance(): ListView[T] = new HeapListView[T](listSerializer.createInstance()) + + override def copy(from: ListView[T]): ListView[T] = { +val list = from.asInstanceOf[HeapListView[T]].list +new HeapListView[T](listSerializer.copy(list)) + } + + override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = copy(from) + + override def getLength: Int = -1 + + override def serialize(record: ListView[T], target: DataOutputView): Unit = { +val list = record.asInstanceOf[HeapListView[T]].list +listSerializer.serialize(list, target) + } + + override def deserialize(source: DataInputView): ListView[T] = +new HeapListView[T](listSerializer.deserialize(source)) + + override def deserialize(reuse: ListView[T], source: DataInputView): ListView[T] = +deserialize(source) + + override def copy(source: DataInputView, target: DataOutputView): Unit = +listSerializer.copy(source, target) + + override def canEqual(obj: scala.Any): Boolean = obj != null && obj.getClass == getClass + + override def hashCode(): Int = listSerializer.hashCode() + + override def equals(obj: Any): Boolean = canEqual(this) && +listSerializer.equals(obj.asInstanceOf[ListSerializer[_]]) + + override def snapshotConfiguration(): TypeSerializerConfigSnapshot = +listSerializer.snapshotConfiguration() + + // copy and modified from ListSerializer.ensureCompatibility + override def ensureCompatibility(configSnapshot: TypeSerializerConfigSnapshot) + : CompatibilityResult[ListView[T]] = { +configSnapshot match { + case snapshot: CollectionSerializerConfigSnapshot[_] => +val previousKvSerializersAndConfigs = snapshot.getNestedSerializersAndConfigs --- End diff -- Change name to `previousListSerializerAndConfig` > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105198#comment-16105198 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130119984 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala --- @@ -19,13 +19,20 @@ package org.apache.flink.table.runtime.aggregate import org.apache.flink.api.common.functions.Function +import org.apache.flink.table.dataview.{DataViewFactory, HeapViewFactory} import org.apache.flink.types.Row /** * Base class for code-generated aggregations. */ abstract class GeneratedAggregations extends Function { + var factory: DataViewFactory = new HeapViewFactory() + + def getDataViewFactory: DataViewFactory = factory + + def setDataViewFactory(factory: DataViewFactory): Unit = this.factory = factory --- End diff -- I think we should rather add a method `initialize(ctx: RuntimeContext)` and generate code to register the state in this method. IMO, the `DataViewFactory` is also not required, because 1. we can code-gen all of that functionality 2. we can make heap the default for `MapView` and `ListView` such that we only need to replace it if it needs to be backed by state. So there would be only one implementation of `DataViewFactory` > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130039762 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.MapViewTypeInfoFactory + +/** + * MapView encapsulates the operation of map. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java HashMap or state backend. It will be replaced by a {@link StateMapView} or a + * {@link HeapMapView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public MapView map; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + *@Override + *public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.map = new MapView<>(Types.STRING, Types.INT); + * accum.count = 0L; + * return accum; + *} + * + *//Overloaded accumulate method --- End diff -- `accumulate()` is not overloaded here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130112099 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -307,6 +312,119 @@ object UserDefinedFunctionUtils { // -- /** +* get data view type information from accumulator constructor. +* +* @param aggFun aggregate function +* @return the data view specification +*/ + def getDataViewTypeInfoFromConstructor( +aggFun: AggregateFunction[_, _]) + : mutable.HashMap[String, TypeInformation[_]] = { + +val resultMap = new mutable.HashMap[String, TypeInformation[_]] +val acc = aggFun.createAccumulator() +val fields: util.List[Field] = TypeExtractor.getAllDeclaredFields(acc.getClass, true) --- End diff -- Couldn't you instead take the accumulator type information and use the field information? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130038328 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * //Overloaded accumulate method + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) --- End diff -- Shouldn't this be `ListViewTypeInfoFactory[T]`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130085778 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{MapSerializer, MapSerializerConfigSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.MapView + +/** + * A serializer for [[HeapMapView]]. The serializer relies on a key serializer and a value + * serializer for the serialization of the map's key-value pairs. + * + * The serialization format for the map is as follows: four bytes for the length of the map, + * followed by the serialized representation of each key-value pair. To allow null values, + * each value is prefixed by a null marker. + * + * @param mapSerializer Map serializer. + * @tparam K The type of the keys in the map. + * @tparam V The type of the values in the map. + */ +@Internal +class MapViewSerializer[K, V](mapSerializer: MapSerializer[K, V]) --- End diff -- Add a test based on `SerializerTestBase` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105200#comment-16105200 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130115155 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -307,6 +312,119 @@ object UserDefinedFunctionUtils { // -- /** +* get data view type information from accumulator constructor. +* +* @param aggFun aggregate function +* @return the data view specification +*/ + def getDataViewTypeInfoFromConstructor( +aggFun: AggregateFunction[_, _]) + : mutable.HashMap[String, TypeInformation[_]] = { + +val resultMap = new mutable.HashMap[String, TypeInformation[_]] +val acc = aggFun.createAccumulator() +val fields: util.List[Field] = TypeExtractor.getAllDeclaredFields(acc.getClass, true) +for (i <- 0 until fields.size()) { + val field = fields.get(i) + field.setAccessible(true) + if (classOf[DataView].isAssignableFrom(field.getType)) { +if (field.getType == classOf[MapView[_, _]]) { + val mapView = field.get(acc) + val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, "keyTypeInfo") + val valueTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, "valueTypeInfo") + if (keyTypeInfo != null && valueTypeInfo != null) { + resultMap.put(field.getName, new MapViewTypeInfo( +keyTypeInfo.asInstanceOf[TypeInformation[_]], +valueTypeInfo.asInstanceOf[TypeInformation[_]])) + } +} else if (field.getType == classOf[ListView[_]]) { + val listView = field.get(acc) + val elementTypeInfo = getFieldValue(classOf[ListView[_]], listView, "elementTypeInfo") + if (elementTypeInfo != null) { +resultMap.put(field.getName, + new ListViewTypeInfo(elementTypeInfo.asInstanceOf[TypeInformation[_]])) + } +} + } +} + +resultMap + } + + + /** +* Extract data view specification. +* +* @param index aggregate function index +* @param aggFun aggregate function +* @param accType accumulator type information +* @param dataViewTypes data view fields types +* @param isUseState is use state +* @return the data view specification +*/ + def extractDataViewTypeInfo( --- End diff -- Rename method to `removeStateViewFieldsFromAccTypeInfo`? > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105180#comment-16105180 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130038328 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * //Overloaded accumulate method + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) --- End diff -- Shouldn't this be `ListViewTypeInfoFactory[T]`? > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130086721 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.MapSerializer +import org.apache.flink.table.api.dataview.MapView + +/** + * [[MapView]] type information. + * + * @param keyType key type information + * @param valueType value type information + * @tparam K key type + * @tparam V value type + */ +@PublicEvolving +class MapViewTypeInfo[K, V]( +val keyType: TypeInformation[K], +val valueType: TypeInformation[V]) + extends TypeInformation[MapView[K, V]] { + + @PublicEvolving + override def isBasicType = false + + @PublicEvolving + override def isTupleType = false + + @PublicEvolving + override def getArity = 0 + + @PublicEvolving + override def getTotalFields = 2 --- End diff -- should be `1` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130057272 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.MapViewTypeInfoFactory + +/** + * MapView encapsulates the operation of map. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java HashMap or state backend. It will be replaced by a {@link StateMapView} or a + * {@link HeapMapView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public MapView map; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + *@Override + *public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.map = new MapView<>(Types.STRING, Types.INT); + * accum.count = 0L; + * return accum; + *} + * + *//Overloaded accumulate method + *public void accumulate(MyAccum accumulator, String id) { + * try { + * if (!accumulator.map.contains(id)) { + *accumulator.map.put(id, 1); + *accumulator.count++; + * } + * } catch (Exception e) { + *e.printStackTrace(); + * } + *} + * + *@Override + *public Long getValue(MyAccum accumulator) { + * return accumulator.count; + *} + * } + * + * }}} + * + * @param keyTypeInfo key type information + * @param valueTypeInfo value type information + * @tparam K key type + * @tparam V value type + */ +@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]]) +class MapView[K, V]( --- End diff -- Most comments on `ListState` apply here as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105197#comment-16105197 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130085778 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{MapSerializer, MapSerializerConfigSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.MapView + +/** + * A serializer for [[HeapMapView]]. The serializer relies on a key serializer and a value + * serializer for the serialization of the map's key-value pairs. + * + * The serialization format for the map is as follows: four bytes for the length of the map, + * followed by the serialized representation of each key-value pair. To allow null values, + * each value is prefixed by a null marker. + * + * @param mapSerializer Map serializer. + * @tparam K The type of the keys in the map. + * @tparam V The type of the values in the map. + */ +@Internal +class MapViewSerializer[K, V](mapSerializer: MapSerializer[K, V]) --- End diff -- Add a test based on `SerializerTestBase` > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130115757 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -82,11 +84,13 @@ object AggregateUtil { isRowsClause: Boolean) : ProcessFunction[CRow, CRow] = { -val (aggFields, aggregates, accTypes) = +val isUseState = true --- End diff -- rename to `isStateBackedDataViews`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130086377 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{MapSerializer, MapSerializerConfigSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.MapView + +/** + * A serializer for [[HeapMapView]]. The serializer relies on a key serializer and a value + * serializer for the serialization of the map's key-value pairs. + * + * The serialization format for the map is as follows: four bytes for the length of the map, + * followed by the serialized representation of each key-value pair. To allow null values, + * each value is prefixed by a null marker. + * + * @param mapSerializer Map serializer. + * @tparam K The type of the keys in the map. + * @tparam V The type of the values in the map. + */ +@Internal +class MapViewSerializer[K, V](mapSerializer: MapSerializer[K, V]) + extends TypeSerializer[MapView[K, V]] { + + override def isImmutableType: Boolean = mapSerializer.isImmutableType + + override def duplicate(): TypeSerializer[MapView[K, V]] = +new MapViewSerializer[K, V]( + mapSerializer.duplicate().asInstanceOf[MapSerializer[K, V]]) + + override def createInstance(): MapView[K, V] = { +new HeapMapView[K, V](mapSerializer.createInstance()) + } + + override def copy(from: MapView[K, V]): MapView[K, V] = { +val map = from.asInstanceOf[HeapMapView[K, V]].map +new HeapMapView[K, V](mapSerializer.copy(map)) + } + + override def copy(from: MapView[K, V], reuse: MapView[K, V]): MapView[K, V] = copy(from) + + override def getLength: Int = -1 // var length + + override def serialize(record: MapView[K, V], target: DataOutputView): Unit = { +val map = record.asInstanceOf[HeapMapView[K, V]].map +mapSerializer.serialize(map, target) + } + + override def deserialize(source: DataInputView): MapView[K, V] = +new HeapMapView[K, V](mapSerializer.deserialize(source)) + + override def deserialize(reuse: MapView[K, V], source: DataInputView): MapView[K, V] = +deserialize(source) + + override def copy(source: DataInputView, target: DataOutputView): Unit = +mapSerializer.copy(source, target) + + override def canEqual(obj: Any): Boolean = obj != null && obj.getClass == getClass + + override def hashCode(): Int = mapSerializer.hashCode() + + override def equals(obj: Any): Boolean = canEqual(this) && +mapSerializer.equals(obj.asInstanceOf[MapSerializer[_, _]]) --- End diff -- should be `obj.asInstanceOf[MapViewSerializer[_, _]].map` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130072175 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[HeapListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the lost, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +@Internal +class ListViewSerializer[T](listSerializer: ListSerializer[T]) + extends TypeSerializer[ListView[T]] { + + override def isImmutableType: Boolean = listSerializer.isImmutableType + + override def duplicate(): TypeSerializer[ListView[T]] = { +new ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]]) + } + + override def createInstance(): ListView[T] = new HeapListView[T](listSerializer.createInstance()) + + override def copy(from: ListView[T]): ListView[T] = { +val list = from.asInstanceOf[HeapListView[T]].list +new HeapListView[T](listSerializer.copy(list)) + } + + override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = copy(from) + + override def getLength: Int = -1 + + override def serialize(record: ListView[T], target: DataOutputView): Unit = { +val list = record.asInstanceOf[HeapListView[T]].list +listSerializer.serialize(list, target) + } + + override def deserialize(source: DataInputView): ListView[T] = +new HeapListView[T](listSerializer.deserialize(source)) + + override def deserialize(reuse: ListView[T], source: DataInputView): ListView[T] = +deserialize(source) + + override def copy(source: DataInputView, target: DataOutputView): Unit = +listSerializer.copy(source, target) + + override def canEqual(obj: scala.Any): Boolean = obj != null && obj.getClass == getClass + + override def hashCode(): Int = listSerializer.hashCode() + + override def equals(obj: Any): Boolean = canEqual(this) && +listSerializer.equals(obj.asInstanceOf[ListSerializer[_]]) + + override def snapshotConfiguration(): TypeSerializerConfigSnapshot = +listSerializer.snapshotConfiguration() + + // copy and modified from ListSerializer.ensureCompatibility + override def ensureCompatibility(configSnapshot: TypeSerializerConfigSnapshot) --- End diff -- Please align as: ``` override def ensureCompatibility( configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[ListView[T]] = { configSnapshot match { ... } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130075088 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfo.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.ListSerializer +import org.apache.flink.table.api.dataview.ListView + +/** + * [[ListView]] type information. + * + * @param elementType element type information + * @tparam T element type + */ +@PublicEvolving +class ListViewTypeInfo[T](val elementType: TypeInformation[T]) + extends TypeInformation[ListView[T]] { + + override def isBasicType: Boolean = false + + override def isTupleType: Boolean = false + + override def getArity: Int = 0 --- End diff -- This must be `1`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130054412 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * //Overloaded accumulate method + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T](val elementTypeInfo: TypeInformation[T]) extends DataView { + + def this() = this(null) + + /** +* Returns an iterable of the list. +* +* @return The iterable of the list or { @code null} if the list is empty. +*/ + def get: JIterable[T] = throw new UnsupportedOperationException("Unsupported operation!") + + /** +* Adding the given value to the list. +* +* @param value element to be appended to this list +*/ + def add(value: T): Unit = throw new UnsupportedOperationException("Unsupported operation!") --- End diff -- Just a thought. How about, we implement `ListView` by default as `HeapListView` and only replace it if we it needs to be state-backed. IMO, this has a few benefits: - a UDAGG can be more easily tested because the accumulator does not need to be changed. - it is more efficient, because we only need to touch the accumulator if it needs to be in the state backend. - should be easier to implement What do you think @kaibozhou, @wuchong? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105210#comment-16105210 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130114248 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -561,4 +679,28 @@ object UserDefinedFunctionUtils { } } } + + /** +* Get field value from a object. +* +* @param clazz class to be analyzed. +* @param obj Object to get field value. +* @param fieldName Field name. +* @return Field value. +*/ + def getFieldValue( --- End diff -- Do we need this method if we can get the type infos from the data views with a `private[flink]` method? > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130039340 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * //Overloaded accumulate method --- End diff -- `accumulate` methods are not overloaded. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105190#comment-16105190 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130086695 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.MapSerializer +import org.apache.flink.table.api.dataview.MapView + +/** + * [[MapView]] type information. + * + * @param keyType key type information + * @param valueType value type information + * @tparam K key type + * @tparam V value type + */ +@PublicEvolving +class MapViewTypeInfo[K, V]( +val keyType: TypeInformation[K], +val valueType: TypeInformation[V]) + extends TypeInformation[MapView[K, V]] { + + @PublicEvolving + override def isBasicType = false + + @PublicEvolving + override def isTupleType = false + + @PublicEvolving + override def getArity = 0 --- End diff -- should be `1` > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105209#comment-16105209 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130086721 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.MapSerializer +import org.apache.flink.table.api.dataview.MapView + +/** + * [[MapView]] type information. + * + * @param keyType key type information + * @param valueType value type information + * @tparam K key type + * @tparam V value type + */ +@PublicEvolving +class MapViewTypeInfo[K, V]( +val keyType: TypeInformation[K], +val valueType: TypeInformation[V]) + extends TypeInformation[MapView[K, V]] { + + @PublicEvolving + override def isBasicType = false + + @PublicEvolving + override def isTupleType = false + + @PublicEvolving + override def getArity = 0 + + @PublicEvolving + override def getTotalFields = 2 --- End diff -- should be `1` > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105194#comment-16105194 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130115757 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -82,11 +84,13 @@ object AggregateUtil { isRowsClause: Boolean) : ProcessFunction[CRow, CRow] = { -val (aggFields, aggregates, accTypes) = +val isUseState = true --- End diff -- rename to `isStateBackedDataViews`? > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7118) Remove hadoop1.x code in HadoopUtils
[ https://issues.apache.org/jira/browse/FLINK-7118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105237#comment-16105237 ] ASF GitHub Bot commented on FLINK-7118: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4285 > Remove hadoop1.x code in HadoopUtils > > > Key: FLINK-7118 > URL: https://issues.apache.org/jira/browse/FLINK-7118 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: mingleizhang >Assignee: mingleizhang > Fix For: 1.4.0 > > > Since flink no longer support hadoop 1.x version, we should remove it. Below > code reside in {{org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils}} > > {code:java} > public static JobContext instantiateJobContext(Configuration configuration, > JobID jobId) throws Exception { > try { > Class clazz; > // for Hadoop 1.xx > if(JobContext.class.isInterface()) { > clazz = > Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, > Thread.currentThread().getContextClassLoader()); > } > // for Hadoop 2.xx > else { > clazz = > Class.forName("org.apache.hadoop.mapreduce.JobContext", true, > Thread.currentThread().getContextClassLoader()); > } > Constructor constructor = > clazz.getConstructor(Configuration.class, JobID.class); > JobContext context = (JobContext) > constructor.newInstance(configuration, jobId); > > return context; > } catch(Exception e) { > throw new Exception("Could not create instance of > JobContext."); > } > } > {code} > And > {code:java} > public static TaskAttemptContext > instantiateTaskAttemptContext(Configuration configuration, TaskAttemptID > taskAttemptID) throws Exception { > try { > Class clazz; > // for Hadoop 1.xx > if(JobContext.class.isInterface()) { > clazz = > Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); > } > // for Hadoop 2.xx > else { > clazz = > Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext"); > } > Constructor constructor = > clazz.getConstructor(Configuration.class, TaskAttemptID.class); > TaskAttemptContext context = (TaskAttemptContext) > constructor.newInstance(configuration, taskAttemptID); > > return context; > } catch(Exception e) { > throw new Exception("Could not create instance of > TaskAttemptContext."); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7118) Remove hadoop1.x code in HadoopUtils
[ https://issues.apache.org/jira/browse/FLINK-7118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7118. -- Resolution: Fixed Fixed via 5725e63309269a02d93046df5cca724eefb25e2d > Remove hadoop1.x code in HadoopUtils > > > Key: FLINK-7118 > URL: https://issues.apache.org/jira/browse/FLINK-7118 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: mingleizhang >Assignee: mingleizhang > Fix For: 1.4.0 > > > Since flink no longer support hadoop 1.x version, we should remove it. Below > code reside in {{org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils}} > > {code:java} > public static JobContext instantiateJobContext(Configuration configuration, > JobID jobId) throws Exception { > try { > Class clazz; > // for Hadoop 1.xx > if(JobContext.class.isInterface()) { > clazz = > Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, > Thread.currentThread().getContextClassLoader()); > } > // for Hadoop 2.xx > else { > clazz = > Class.forName("org.apache.hadoop.mapreduce.JobContext", true, > Thread.currentThread().getContextClassLoader()); > } > Constructor constructor = > clazz.getConstructor(Configuration.class, JobID.class); > JobContext context = (JobContext) > constructor.newInstance(configuration, jobId); > > return context; > } catch(Exception e) { > throw new Exception("Could not create instance of > JobContext."); > } > } > {code} > And > {code:java} > public static TaskAttemptContext > instantiateTaskAttemptContext(Configuration configuration, TaskAttemptID > taskAttemptID) throws Exception { > try { > Class clazz; > // for Hadoop 1.xx > if(JobContext.class.isInterface()) { > clazz = > Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); > } > // for Hadoop 2.xx > else { > clazz = > Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext"); > } > Constructor constructor = > clazz.getConstructor(Configuration.class, TaskAttemptID.class); > TaskAttemptContext context = (TaskAttemptContext) > constructor.newInstance(configuration, taskAttemptID); > > return context; > } catch(Exception e) { > throw new Exception("Could not create instance of > TaskAttemptContext."); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7134) Remove hadoop1.x code in mapreduce.utils.HadoopUtils
[ https://issues.apache.org/jira/browse/FLINK-7134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105236#comment-16105236 ] ASF GitHub Bot commented on FLINK-7134: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4362 > Remove hadoop1.x code in mapreduce.utils.HadoopUtils > > > Key: FLINK-7134 > URL: https://issues.apache.org/jira/browse/FLINK-7134 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: mingleizhang >Assignee: mingleizhang > Fix For: 1.4.0 > > > This jira is similar to FLINK-7118. And for a clearer format and a review, I > separated the two jira. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4362: [FLINK-7134] Remove hadoop1.x code in mapreduce.ut...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4362 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4285: [FLINK-7118] [hadoop] Remove hadoop1.x code in Had...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4285 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-7134) Remove hadoop1.x code in mapreduce.utils.HadoopUtils
[ https://issues.apache.org/jira/browse/FLINK-7134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7134. -- Resolution: Fixed Fixed via 8e367731019ee70e2f6dc1be21c80f51a2ef6a2b > Remove hadoop1.x code in mapreduce.utils.HadoopUtils > > > Key: FLINK-7134 > URL: https://issues.apache.org/jira/browse/FLINK-7134 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: mingleizhang >Assignee: mingleizhang > Fix For: 1.4.0 > > > This jira is similar to FLINK-7118. And for a clearer format and a review, I > separated the two jira. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7092) Shutdown ResourceManager components properly
[ https://issues.apache.org/jira/browse/FLINK-7092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7092. -- Resolution: Fixed Fix Version/s: 1.4.0 Fixed via 6d3cffc17be5f4f5b586ef9fa1b2feb9204369dd > Shutdown ResourceManager components properly > > > Key: FLINK-7092 > URL: https://issues.apache.org/jira/browse/FLINK-7092 > Project: Flink > Issue Type: Sub-task > Components: Mesos >Reporter: Till Rohrmann >Assignee: mingleizhang >Priority: Minor > Labels: flip-6 > Fix For: 1.4.0 > > > The {{MesosResourceManager}} starts internally a {{TaskMonitor}}, > {{LaunchCoordinator}}, {{ConnectionMonitor}} and a > {{ReconciliationCoordinator}}. These components have to be properly shut down > when the {{MesosResourceManager}} closes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4289: [FLINK-7092] [mesos] Shutdown ResourceManager comp...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4289 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130071770 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[HeapListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the lost, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +@Internal +class ListViewSerializer[T](listSerializer: ListSerializer[T]) + extends TypeSerializer[ListView[T]] { + + override def isImmutableType: Boolean = listSerializer.isImmutableType + + override def duplicate(): TypeSerializer[ListView[T]] = { +new ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]]) + } + + override def createInstance(): ListView[T] = new HeapListView[T](listSerializer.createInstance()) + + override def copy(from: ListView[T]): ListView[T] = { +val list = from.asInstanceOf[HeapListView[T]].list +new HeapListView[T](listSerializer.copy(list)) + } + + override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = copy(from) + + override def getLength: Int = -1 + + override def serialize(record: ListView[T], target: DataOutputView): Unit = { +val list = record.asInstanceOf[HeapListView[T]].list +listSerializer.serialize(list, target) + } + + override def deserialize(source: DataInputView): ListView[T] = +new HeapListView[T](listSerializer.deserialize(source)) + + override def deserialize(reuse: ListView[T], source: DataInputView): ListView[T] = +deserialize(source) + + override def copy(source: DataInputView, target: DataOutputView): Unit = +listSerializer.copy(source, target) + + override def canEqual(obj: scala.Any): Boolean = obj != null && obj.getClass == getClass + + override def hashCode(): Int = listSerializer.hashCode() + + override def equals(obj: Any): Boolean = canEqual(this) && +listSerializer.equals(obj.asInstanceOf[ListSerializer[_]]) --- End diff -- There is `SerializerTestBase` which can be used to implement unit tests for `TypeSerializer` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130106082 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -307,6 +312,119 @@ object UserDefinedFunctionUtils { // -- /** +* get data view type information from accumulator constructor. +* +* @param aggFun aggregate function +* @return the data view specification +*/ + def getDataViewTypeInfoFromConstructor( --- End diff -- Please add comments to this method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7296) Validate commit messages in git pre-receive hook
[ https://issues.apache.org/jira/browse/FLINK-7296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105217#comment-16105217 ] Greg Hogan commented on FLINK-7296: --- [~uce] I'm trying to think through where this should run. On the client side committers may not have (pre-push?) hook configured on a repo so changes could slip through. GitHub doesn't allow server-side hooks (instead sending out webhooks notifications). Apache Infra may allow a server-side hook but would require a ticket to update (and if ever switching to GitBox would require committers to continue pushing to the Apache-hosted repo). > Validate commit messages in git pre-receive hook > > > Key: FLINK-7296 > URL: https://issues.apache.org/jira/browse/FLINK-7296 > Project: Flink > Issue Type: Improvement >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > Would like to investigate a pre-receive (server-side) hook analyzing the > commit message incoming revisions on the {{master}} branch for the standard > JIRA format ({{\[FLINK-\] \[component\] ...}} or {{\[hotfix\] > \[component\] ...}}). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105195#comment-16105195 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130106082 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -307,6 +312,119 @@ object UserDefinedFunctionUtils { // -- /** +* get data view type information from accumulator constructor. +* +* @param aggFun aggregate function +* @return the data view specification +*/ + def getDataViewTypeInfoFromConstructor( --- End diff -- Please add comments to this method > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105199#comment-16105199 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130074361 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[HeapListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the lost, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +@Internal +class ListViewSerializer[T](listSerializer: ListSerializer[T]) + extends TypeSerializer[ListView[T]] { + + override def isImmutableType: Boolean = listSerializer.isImmutableType + + override def duplicate(): TypeSerializer[ListView[T]] = { +new ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]]) + } + + override def createInstance(): ListView[T] = new HeapListView[T](listSerializer.createInstance()) + + override def copy(from: ListView[T]): ListView[T] = { +val list = from.asInstanceOf[HeapListView[T]].list +new HeapListView[T](listSerializer.copy(list)) + } + + override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = copy(from) + + override def getLength: Int = -1 + + override def serialize(record: ListView[T], target: DataOutputView): Unit = { +val list = record.asInstanceOf[HeapListView[T]].list +listSerializer.serialize(list, target) + } + + override def deserialize(source: DataInputView): ListView[T] = +new HeapListView[T](listSerializer.deserialize(source)) + + override def deserialize(reuse: ListView[T], source: DataInputView): ListView[T] = +deserialize(source) + + override def copy(source: DataInputView, target: DataOutputView): Unit = +listSerializer.copy(source, target) + + override def canEqual(obj: scala.Any): Boolean = obj != null && obj.getClass == getClass + + override def hashCode(): Int = listSerializer.hashCode() + + override def equals(obj: Any): Boolean = canEqual(this) && +listSerializer.equals(obj.asInstanceOf[ListSerializer[_]]) + + override def snapshotConfiguration(): TypeSerializerConfigSnapshot = +listSerializer.snapshotConfiguration() + + // copy and modified from ListSerializer.ensureCompatibility + override def ensureCompatibility(configSnapshot: TypeSerializerConfigSnapshot) --- End diff -- Can this method be easier implemented by calling `listSerializer.ensureCompatibility` and checking if the returned `CompatibilityResult` requires migration. If the passed serializer is not `null` it is wrapped in a `ListViewSerializer`. This won't work for `MapViewSerializer` because it has to handle two serializers, but for `ListViewSerializer` it should work. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink >
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105201#comment-16105201 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130071770 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[HeapListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the lost, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +@Internal +class ListViewSerializer[T](listSerializer: ListSerializer[T]) + extends TypeSerializer[ListView[T]] { + + override def isImmutableType: Boolean = listSerializer.isImmutableType + + override def duplicate(): TypeSerializer[ListView[T]] = { +new ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]]) + } + + override def createInstance(): ListView[T] = new HeapListView[T](listSerializer.createInstance()) + + override def copy(from: ListView[T]): ListView[T] = { +val list = from.asInstanceOf[HeapListView[T]].list +new HeapListView[T](listSerializer.copy(list)) + } + + override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = copy(from) + + override def getLength: Int = -1 + + override def serialize(record: ListView[T], target: DataOutputView): Unit = { +val list = record.asInstanceOf[HeapListView[T]].list +listSerializer.serialize(list, target) + } + + override def deserialize(source: DataInputView): ListView[T] = +new HeapListView[T](listSerializer.deserialize(source)) + + override def deserialize(reuse: ListView[T], source: DataInputView): ListView[T] = +deserialize(source) + + override def copy(source: DataInputView, target: DataOutputView): Unit = +listSerializer.copy(source, target) + + override def canEqual(obj: scala.Any): Boolean = obj != null && obj.getClass == getClass + + override def hashCode(): Int = listSerializer.hashCode() + + override def equals(obj: Any): Boolean = canEqual(this) && +listSerializer.equals(obj.asInstanceOf[ListSerializer[_]]) --- End diff -- There is `SerializerTestBase` which can be used to implement unit tests for `TypeSerializer` > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105202#comment-16105202 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130113134 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -307,6 +312,119 @@ object UserDefinedFunctionUtils { // -- /** +* get data view type information from accumulator constructor. +* +* @param aggFun aggregate function +* @return the data view specification +*/ + def getDataViewTypeInfoFromConstructor( +aggFun: AggregateFunction[_, _]) + : mutable.HashMap[String, TypeInformation[_]] = { + +val resultMap = new mutable.HashMap[String, TypeInformation[_]] +val acc = aggFun.createAccumulator() +val fields: util.List[Field] = TypeExtractor.getAllDeclaredFields(acc.getClass, true) +for (i <- 0 until fields.size()) { + val field = fields.get(i) + field.setAccessible(true) + if (classOf[DataView].isAssignableFrom(field.getType)) { +if (field.getType == classOf[MapView[_, _]]) { + val mapView = field.get(acc) + val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, "keyTypeInfo") --- End diff -- If we add `private[flink]` methods to `MapView` (and ListView) to access the key and value type infos we don't need to use reflection. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130071629 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[HeapListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the lost, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +@Internal +class ListViewSerializer[T](listSerializer: ListSerializer[T]) + extends TypeSerializer[ListView[T]] { + + override def isImmutableType: Boolean = listSerializer.isImmutableType + + override def duplicate(): TypeSerializer[ListView[T]] = { +new ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]]) + } + + override def createInstance(): ListView[T] = new HeapListView[T](listSerializer.createInstance()) + + override def copy(from: ListView[T]): ListView[T] = { +val list = from.asInstanceOf[HeapListView[T]].list +new HeapListView[T](listSerializer.copy(list)) + } + + override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = copy(from) + + override def getLength: Int = -1 + + override def serialize(record: ListView[T], target: DataOutputView): Unit = { +val list = record.asInstanceOf[HeapListView[T]].list +listSerializer.serialize(list, target) + } + + override def deserialize(source: DataInputView): ListView[T] = +new HeapListView[T](listSerializer.deserialize(source)) + + override def deserialize(reuse: ListView[T], source: DataInputView): ListView[T] = +deserialize(source) + + override def copy(source: DataInputView, target: DataOutputView): Unit = +listSerializer.copy(source, target) + + override def canEqual(obj: scala.Any): Boolean = obj != null && obj.getClass == getClass + + override def hashCode(): Int = listSerializer.hashCode() + + override def equals(obj: Any): Boolean = canEqual(this) && +listSerializer.equals(obj.asInstanceOf[ListSerializer[_]]) --- End diff -- this should be `obj.asInstanceOf[ListViewSerializer[_]].listSerializer` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105207#comment-16105207 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130059497 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/HeapViewFactory.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.state.{ListStateDescriptor, MapStateDescriptor, StateDescriptor} +import org.apache.flink.table.api.dataview.{ListView, MapView} + +/** + * Heap view factory to create [[HeapListView]] or [[HeapMapView]]. + * + */ +class HeapViewFactory() extends DataViewFactory() { + + override protected def createListView[T](id: String): ListView[T] = new HeapListView[T] + + override protected def createMapView[K, V](id: String): MapView[K, V] = new HeapMapView[K, V] +} + +class HeapListView[T] extends ListView[T] { + + val list = new util.ArrayList[T]() + + def this(t: util.List[T]) = { +this() +list.addAll(t) + } + + override def get: JIterable[T] = { +if (!list.isEmpty) { + list +} else { + null +} + } + + override def add(value: T): Unit = list.add(value) + + override def clear(): Unit = list.clear() +} + +class HeapMapView[K, V] extends MapView[K, V] { --- End diff -- Could this be the default implementation of `MapView`? > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105179#comment-16105179 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130058705 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/DataViewFactory.scala --- @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.table.api.dataview.{ListView, MapView} + +/** + * Factory to creaate [[ListView]] or [[MapView]]. + * + */ +abstract class DataViewFactory() extends Serializable { --- End diff -- Would we need this if we only need to replace the view if it is backed by a state backend? > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130073508 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[HeapListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the lost, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +@Internal +class ListViewSerializer[T](listSerializer: ListSerializer[T]) + extends TypeSerializer[ListView[T]] { + + override def isImmutableType: Boolean = listSerializer.isImmutableType + + override def duplicate(): TypeSerializer[ListView[T]] = { +new ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]]) + } + + override def createInstance(): ListView[T] = new HeapListView[T](listSerializer.createInstance()) + + override def copy(from: ListView[T]): ListView[T] = { +val list = from.asInstanceOf[HeapListView[T]].list +new HeapListView[T](listSerializer.copy(list)) + } + + override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = copy(from) + + override def getLength: Int = -1 + + override def serialize(record: ListView[T], target: DataOutputView): Unit = { +val list = record.asInstanceOf[HeapListView[T]].list +listSerializer.serialize(list, target) + } + + override def deserialize(source: DataInputView): ListView[T] = +new HeapListView[T](listSerializer.deserialize(source)) + + override def deserialize(reuse: ListView[T], source: DataInputView): ListView[T] = +deserialize(source) + + override def copy(source: DataInputView, target: DataOutputView): Unit = +listSerializer.copy(source, target) + + override def canEqual(obj: scala.Any): Boolean = obj != null && obj.getClass == getClass + + override def hashCode(): Int = listSerializer.hashCode() + + override def equals(obj: Any): Boolean = canEqual(this) && +listSerializer.equals(obj.asInstanceOf[ListSerializer[_]]) + + override def snapshotConfiguration(): TypeSerializerConfigSnapshot = +listSerializer.snapshotConfiguration() + + // copy and modified from ListSerializer.ensureCompatibility + override def ensureCompatibility(configSnapshot: TypeSerializerConfigSnapshot) + : CompatibilityResult[ListView[T]] = { +configSnapshot match { + case snapshot: CollectionSerializerConfigSnapshot[_] => +val previousKvSerializersAndConfigs = snapshot.getNestedSerializersAndConfigs --- End diff -- use `getSingleNestedSerializerAndConfig()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130059825 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[HeapListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the lost, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +@Internal +class ListViewSerializer[T](listSerializer: ListSerializer[T]) + extends TypeSerializer[ListView[T]] { + + override def isImmutableType: Boolean = listSerializer.isImmutableType --- End diff -- return `false` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105183#comment-16105183 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130054412 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * //Overloaded accumulate method + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T](val elementTypeInfo: TypeInformation[T]) extends DataView { + + def this() = this(null) + + /** +* Returns an iterable of the list. +* +* @return The iterable of the list or { @code null} if the list is empty. +*/ + def get: JIterable[T] = throw new UnsupportedOperationException("Unsupported operation!") + + /** +* Adding the given value to the list. +* +* @param value element to be appended to this list +*/ + def add(value: T): Unit = throw new UnsupportedOperationException("Unsupported operation!") --- End diff -- Just a thought. How about, we implement `ListView` by default as `HeapListView` and only replace it if we it needs to be state-backed. IMO, this has a few benefits: - a UDAGG can be more easily tested because the accumulator does not need to be changed. - it is more efficient, because we only need to touch the accumulator if it needs to be in the state backend. - should be easier to implement What do you think @kaibozhou, @wuchong? > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105178#comment-16105178 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130039340 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * //Overloaded accumulate method --- End diff -- `accumulate` methods are not overloaded. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105208#comment-16105208 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130072175 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[HeapListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the lost, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +@Internal +class ListViewSerializer[T](listSerializer: ListSerializer[T]) + extends TypeSerializer[ListView[T]] { + + override def isImmutableType: Boolean = listSerializer.isImmutableType + + override def duplicate(): TypeSerializer[ListView[T]] = { +new ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]]) + } + + override def createInstance(): ListView[T] = new HeapListView[T](listSerializer.createInstance()) + + override def copy(from: ListView[T]): ListView[T] = { +val list = from.asInstanceOf[HeapListView[T]].list +new HeapListView[T](listSerializer.copy(list)) + } + + override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = copy(from) + + override def getLength: Int = -1 + + override def serialize(record: ListView[T], target: DataOutputView): Unit = { +val list = record.asInstanceOf[HeapListView[T]].list +listSerializer.serialize(list, target) + } + + override def deserialize(source: DataInputView): ListView[T] = +new HeapListView[T](listSerializer.deserialize(source)) + + override def deserialize(reuse: ListView[T], source: DataInputView): ListView[T] = +deserialize(source) + + override def copy(source: DataInputView, target: DataOutputView): Unit = +listSerializer.copy(source, target) + + override def canEqual(obj: scala.Any): Boolean = obj != null && obj.getClass == getClass + + override def hashCode(): Int = listSerializer.hashCode() + + override def equals(obj: Any): Boolean = canEqual(this) && +listSerializer.equals(obj.asInstanceOf[ListSerializer[_]]) + + override def snapshotConfiguration(): TypeSerializerConfigSnapshot = +listSerializer.snapshotConfiguration() + + // copy and modified from ListSerializer.ensureCompatibility + override def ensureCompatibility(configSnapshot: TypeSerializerConfigSnapshot) --- End diff -- Please align as: ``` override def ensureCompatibility( configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[ListView[T]] = { configSnapshot match { ... } } ``` > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > I
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105189#comment-16105189 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130051854 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * //Overloaded accumulate method + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T](val elementTypeInfo: TypeInformation[T]) extends DataView { --- End diff -- `with ListState[T]` or `with AppendingState[T, JIterable[T]]`? > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Reopened] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
[ https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reopened FLINK-6996: -- There seems to be a test instability with {{Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink}}. https://s3.amazonaws.com/archive.travis-ci.org/jobs/258538641/log.txt > FlinkKafkaProducer010 doesn't guarantee at-least-once semantic > -- > > Key: FLINK-6996 > URL: https://issues.apache.org/jira/browse/FLINK-6996 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0, 1.3.2 > > > FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This > means, when it's used like a "regular sink function" (option a from [the java > doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html]) > it will not flush the data on "snapshotState" as it is supposed to. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130114248 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -561,4 +679,28 @@ object UserDefinedFunctionUtils { } } } + + /** +* Get field value from a object. +* +* @param clazz class to be analyzed. +* @param obj Object to get field value. +* @param fieldName Field name. +* @return Field value. +*/ + def getFieldValue( --- End diff -- Do we need this method if we can get the type infos from the data views with a `private[flink]` method? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105211#comment-16105211 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130112099 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -307,6 +312,119 @@ object UserDefinedFunctionUtils { // -- /** +* get data view type information from accumulator constructor. +* +* @param aggFun aggregate function +* @return the data view specification +*/ + def getDataViewTypeInfoFromConstructor( +aggFun: AggregateFunction[_, _]) + : mutable.HashMap[String, TypeInformation[_]] = { + +val resultMap = new mutable.HashMap[String, TypeInformation[_]] +val acc = aggFun.createAccumulator() +val fields: util.List[Field] = TypeExtractor.getAllDeclaredFields(acc.getClass, true) --- End diff -- Couldn't you instead take the accumulator type information and use the field information? > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105187#comment-16105187 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130059458 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/HeapViewFactory.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.state.{ListStateDescriptor, MapStateDescriptor, StateDescriptor} +import org.apache.flink.table.api.dataview.{ListView, MapView} + +/** + * Heap view factory to create [[HeapListView]] or [[HeapMapView]]. + * + */ +class HeapViewFactory() extends DataViewFactory() { + + override protected def createListView[T](id: String): ListView[T] = new HeapListView[T] + + override protected def createMapView[K, V](id: String): MapView[K, V] = new HeapMapView[K, V] +} + +class HeapListView[T] extends ListView[T] { --- End diff -- Could this be the default implementation of `ListView`? > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130074361 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[HeapListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the lost, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +@Internal +class ListViewSerializer[T](listSerializer: ListSerializer[T]) + extends TypeSerializer[ListView[T]] { + + override def isImmutableType: Boolean = listSerializer.isImmutableType + + override def duplicate(): TypeSerializer[ListView[T]] = { +new ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]]) + } + + override def createInstance(): ListView[T] = new HeapListView[T](listSerializer.createInstance()) + + override def copy(from: ListView[T]): ListView[T] = { +val list = from.asInstanceOf[HeapListView[T]].list +new HeapListView[T](listSerializer.copy(list)) + } + + override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = copy(from) + + override def getLength: Int = -1 + + override def serialize(record: ListView[T], target: DataOutputView): Unit = { +val list = record.asInstanceOf[HeapListView[T]].list +listSerializer.serialize(list, target) + } + + override def deserialize(source: DataInputView): ListView[T] = +new HeapListView[T](listSerializer.deserialize(source)) + + override def deserialize(reuse: ListView[T], source: DataInputView): ListView[T] = +deserialize(source) + + override def copy(source: DataInputView, target: DataOutputView): Unit = +listSerializer.copy(source, target) + + override def canEqual(obj: scala.Any): Boolean = obj != null && obj.getClass == getClass + + override def hashCode(): Int = listSerializer.hashCode() + + override def equals(obj: Any): Boolean = canEqual(this) && +listSerializer.equals(obj.asInstanceOf[ListSerializer[_]]) + + override def snapshotConfiguration(): TypeSerializerConfigSnapshot = +listSerializer.snapshotConfiguration() + + // copy and modified from ListSerializer.ensureCompatibility + override def ensureCompatibility(configSnapshot: TypeSerializerConfigSnapshot) --- End diff -- Can this method be easier implemented by calling `listSerializer.ensureCompatibility` and checking if the returned `CompatibilityResult` requires migration. If the passed serializer is not `null` it is wrapped in a `ListViewSerializer`. This won't work for `MapViewSerializer` because it has to handle two serializers, but for `ListViewSerializer` it should work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130040193 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * //Overloaded accumulate method + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T](val elementTypeInfo: TypeInformation[T]) extends DataView { + + def this() = this(null) + + /** +* Returns an iterable of the list. +* +* @return The iterable of the list or { @code null} if the list is empty. +*/ + def get: JIterable[T] = throw new UnsupportedOperationException("Unsupported operation!") --- End diff -- is this Java's `java.lang.UnsupportedOperationException`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130073596 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[HeapListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the lost, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +@Internal +class ListViewSerializer[T](listSerializer: ListSerializer[T]) + extends TypeSerializer[ListView[T]] { + + override def isImmutableType: Boolean = listSerializer.isImmutableType + + override def duplicate(): TypeSerializer[ListView[T]] = { +new ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]]) + } + + override def createInstance(): ListView[T] = new HeapListView[T](listSerializer.createInstance()) + + override def copy(from: ListView[T]): ListView[T] = { +val list = from.asInstanceOf[HeapListView[T]].list +new HeapListView[T](listSerializer.copy(list)) + } + + override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = copy(from) + + override def getLength: Int = -1 + + override def serialize(record: ListView[T], target: DataOutputView): Unit = { +val list = record.asInstanceOf[HeapListView[T]].list +listSerializer.serialize(list, target) + } + + override def deserialize(source: DataInputView): ListView[T] = +new HeapListView[T](listSerializer.deserialize(source)) + + override def deserialize(reuse: ListView[T], source: DataInputView): ListView[T] = +deserialize(source) + + override def copy(source: DataInputView, target: DataOutputView): Unit = +listSerializer.copy(source, target) + + override def canEqual(obj: scala.Any): Boolean = obj != null && obj.getClass == getClass + + override def hashCode(): Int = listSerializer.hashCode() + + override def equals(obj: Any): Boolean = canEqual(this) && +listSerializer.equals(obj.asInstanceOf[ListSerializer[_]]) + + override def snapshotConfiguration(): TypeSerializerConfigSnapshot = +listSerializer.snapshotConfiguration() + + // copy and modified from ListSerializer.ensureCompatibility + override def ensureCompatibility(configSnapshot: TypeSerializerConfigSnapshot) + : CompatibilityResult[ListView[T]] = { +configSnapshot match { + case snapshot: CollectionSerializerConfigSnapshot[_] => +val previousKvSerializersAndConfigs = snapshot.getNestedSerializersAndConfigs --- End diff -- Change name to `previousListSerializerAndConfig` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105181#comment-16105181 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130053061 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. --- End diff -- ``` ListView provides List functionality for accumulators used by user-defined aggregate functions {{AggregateFunction}}. A ListView can be backed by a Java ArrayList or a state backend, depending on the context in which the function is used. At runtime `ListView` will be replaced by a {@link StateListView} or a {@link HeapListView}. Hence, the `ListView's` method do not need to be implemented. ``` > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130113134 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -307,6 +312,119 @@ object UserDefinedFunctionUtils { // -- /** +* get data view type information from accumulator constructor. +* +* @param aggFun aggregate function +* @return the data view specification +*/ + def getDataViewTypeInfoFromConstructor( +aggFun: AggregateFunction[_, _]) + : mutable.HashMap[String, TypeInformation[_]] = { + +val resultMap = new mutable.HashMap[String, TypeInformation[_]] +val acc = aggFun.createAccumulator() +val fields: util.List[Field] = TypeExtractor.getAllDeclaredFields(acc.getClass, true) +for (i <- 0 until fields.size()) { + val field = fields.get(i) + field.setAccessible(true) + if (classOf[DataView].isAssignableFrom(field.getType)) { +if (field.getType == classOf[MapView[_, _]]) { + val mapView = field.get(acc) + val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, "keyTypeInfo") --- End diff -- If we add `private[flink]` methods to `MapView` (and ListView) to access the key and value type infos we don't need to use reflection. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130104980 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -307,6 +312,119 @@ object UserDefinedFunctionUtils { // -- /** +* get data view type information from accumulator constructor. +* +* @param aggFun aggregate function +* @return the data view specification +*/ + def getDataViewTypeInfoFromConstructor( +aggFun: AggregateFunction[_, _]) + : mutable.HashMap[String, TypeInformation[_]] = { + +val resultMap = new mutable.HashMap[String, TypeInformation[_]] +val acc = aggFun.createAccumulator() +val fields: util.List[Field] = TypeExtractor.getAllDeclaredFields(acc.getClass, true) +for (i <- 0 until fields.size()) { + val field = fields.get(i) + field.setAccessible(true) + if (classOf[DataView].isAssignableFrom(field.getType)) { +if (field.getType == classOf[MapView[_, _]]) { + val mapView = field.get(acc) + val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, "keyTypeInfo") + val valueTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, "valueTypeInfo") + if (keyTypeInfo != null && valueTypeInfo != null) { + resultMap.put(field.getName, new MapViewTypeInfo( +keyTypeInfo.asInstanceOf[TypeInformation[_]], +valueTypeInfo.asInstanceOf[TypeInformation[_]])) + } +} else if (field.getType == classOf[ListView[_]]) { + val listView = field.get(acc) + val elementTypeInfo = getFieldValue(classOf[ListView[_]], listView, "elementTypeInfo") + if (elementTypeInfo != null) { +resultMap.put(field.getName, + new ListViewTypeInfo(elementTypeInfo.asInstanceOf[TypeInformation[_]])) + } +} + } +} + +resultMap + } + + + /** +* Extract data view specification. +* +* @param index aggregate function index +* @param aggFun aggregate function +* @param accType accumulator type information +* @param dataViewTypes data view fields types +* @param isUseState is use state +* @return the data view specification +*/ + def extractDataViewTypeInfo( --- End diff -- This method should get some inline comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130085690 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{MapSerializer, MapSerializerConfigSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.MapView + +/** + * A serializer for [[HeapMapView]]. The serializer relies on a key serializer and a value + * serializer for the serialization of the map's key-value pairs. + * + * The serialization format for the map is as follows: four bytes for the length of the map, + * followed by the serialized representation of each key-value pair. To allow null values, + * each value is prefixed by a null marker. + * + * @param mapSerializer Map serializer. + * @tparam K The type of the keys in the map. + * @tparam V The type of the values in the map. + */ +@Internal +class MapViewSerializer[K, V](mapSerializer: MapSerializer[K, V]) + extends TypeSerializer[MapView[K, V]] { + + override def isImmutableType: Boolean = mapSerializer.isImmutableType --- End diff -- `false` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130126823 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java --- @@ -135,4 +138,172 @@ public void retract(WeightedAvgAccum accumulator, int iValue, int iWeight) { accumulator.count -= iWeight; } } + + /** +* CountDistinct accumulator. +*/ + public static class CountDistinctAccum { + public MapView map; + public long count; + } + + /** +* CountDistinct aggregate. +*/ + public static class CountDistinct extends AggregateFunction { --- End diff -- I don't think we should implement `COUNT DISTINCT` as a special `AggregateFunction`. At least not in the long term. I think it would be better to handle this inside of the `GeneratedAggregations` and only accumulate and retract distinct values from user-defined aggregate functions. With this approach, any aggregation function can be used with `DISTINCT` and the state for distinction can also be shared across multiple aggregation functions. This is also the approach that has been started in PR #3783. For now this is fine, but in the long run we should go for something like PR #3783 (which also requires the `GeneratedAggregations.initialize()` method.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130117980 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -108,30 +112,38 @@ object AggregateUtil { outputArity, needRetract = false, needMerge = false, - needReset = false + needReset = false, + accConfig = Some(DataViewConfig(accSpecs, isUseState)) ) +val accConfig = accSpecs + .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) + .toMap[String, StateDescriptor[_, _]] + if (isRowTimeType) { if (isRowsClause) { // ROWS unbounded over process function new RowTimeUnboundedRowsOver( genFunction, aggregationStateType, CRowTypeInfo(inputTypeInfo), - queryConfig) + queryConfig, + accConfig) --- End diff -- I think it would be better to keep `accConfig` out of the ProcessFunctions. It is just passing information to the `GeneratedAggregations` which could also be code generated. The only thing that we need is a `RuntimeContext` in `GeneratedAggregations`. Therefore, I propose to add a method `GeneratedAggregations.initialize(ctx: RuntimeContext())` instead of adding `GeneratedAggregations.setDataViewFactory()`. In `initialize()` we can generate code that registers all necessary state by itself and keeps it as member variables. I think this would be cleaner because it encapsulates everything that's related to aggregation functions in the code-gen'd class. If we use heap state in `MapView` and `ListView` as default, we also won't need `DataViewFactory` because we can generate all state access directly (if required). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105191#comment-16105191 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130073508 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[HeapListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the lost, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +@Internal +class ListViewSerializer[T](listSerializer: ListSerializer[T]) + extends TypeSerializer[ListView[T]] { + + override def isImmutableType: Boolean = listSerializer.isImmutableType + + override def duplicate(): TypeSerializer[ListView[T]] = { +new ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]]) + } + + override def createInstance(): ListView[T] = new HeapListView[T](listSerializer.createInstance()) + + override def copy(from: ListView[T]): ListView[T] = { +val list = from.asInstanceOf[HeapListView[T]].list +new HeapListView[T](listSerializer.copy(list)) + } + + override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = copy(from) + + override def getLength: Int = -1 + + override def serialize(record: ListView[T], target: DataOutputView): Unit = { +val list = record.asInstanceOf[HeapListView[T]].list +listSerializer.serialize(list, target) + } + + override def deserialize(source: DataInputView): ListView[T] = +new HeapListView[T](listSerializer.deserialize(source)) + + override def deserialize(reuse: ListView[T], source: DataInputView): ListView[T] = +deserialize(source) + + override def copy(source: DataInputView, target: DataOutputView): Unit = +listSerializer.copy(source, target) + + override def canEqual(obj: scala.Any): Boolean = obj != null && obj.getClass == getClass + + override def hashCode(): Int = listSerializer.hashCode() + + override def equals(obj: Any): Boolean = canEqual(this) && +listSerializer.equals(obj.asInstanceOf[ListSerializer[_]]) + + override def snapshotConfiguration(): TypeSerializerConfigSnapshot = +listSerializer.snapshotConfiguration() + + // copy and modified from ListSerializer.ensureCompatibility + override def ensureCompatibility(configSnapshot: TypeSerializerConfigSnapshot) + : CompatibilityResult[ListView[T]] = { +configSnapshot match { + case snapshot: CollectionSerializerConfigSnapshot[_] => +val previousKvSerializersAndConfigs = snapshot.getNestedSerializersAndConfigs --- End diff -- use `getSingleNestedSerializerAndConfig()` > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaib
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130038499 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * --- End diff -- ScalaDocs are not formatted with HTML: http://docs.scala-lang.org/style/scaladoc.html --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130086695 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.MapSerializer +import org.apache.flink.table.api.dataview.MapView + +/** + * [[MapView]] type information. + * + * @param keyType key type information + * @param valueType value type information + * @tparam K key type + * @tparam V value type + */ +@PublicEvolving +class MapViewTypeInfo[K, V]( +val keyType: TypeInformation[K], +val valueType: TypeInformation[V]) + extends TypeInformation[MapView[K, V]] { + + @PublicEvolving + override def isBasicType = false + + @PublicEvolving + override def isTupleType = false + + @PublicEvolving + override def getArity = 0 --- End diff -- should be `1` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105184#comment-16105184 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130039762 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.MapViewTypeInfoFactory + +/** + * MapView encapsulates the operation of map. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java HashMap or state backend. It will be replaced by a {@link StateMapView} or a + * {@link HeapMapView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public MapView map; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + *@Override + *public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.map = new MapView<>(Types.STRING, Types.INT); + * accum.count = 0L; + * return accum; + *} + * + *//Overloaded accumulate method --- End diff -- `accumulate()` is not overloaded here. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130115155 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -307,6 +312,119 @@ object UserDefinedFunctionUtils { // -- /** +* get data view type information from accumulator constructor. +* +* @param aggFun aggregate function +* @return the data view specification +*/ + def getDataViewTypeInfoFromConstructor( +aggFun: AggregateFunction[_, _]) + : mutable.HashMap[String, TypeInformation[_]] = { + +val resultMap = new mutable.HashMap[String, TypeInformation[_]] +val acc = aggFun.createAccumulator() +val fields: util.List[Field] = TypeExtractor.getAllDeclaredFields(acc.getClass, true) +for (i <- 0 until fields.size()) { + val field = fields.get(i) + field.setAccessible(true) + if (classOf[DataView].isAssignableFrom(field.getType)) { +if (field.getType == classOf[MapView[_, _]]) { + val mapView = field.get(acc) + val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, "keyTypeInfo") + val valueTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, "valueTypeInfo") + if (keyTypeInfo != null && valueTypeInfo != null) { + resultMap.put(field.getName, new MapViewTypeInfo( +keyTypeInfo.asInstanceOf[TypeInformation[_]], +valueTypeInfo.asInstanceOf[TypeInformation[_]])) + } +} else if (field.getType == classOf[ListView[_]]) { + val listView = field.get(acc) + val elementTypeInfo = getFieldValue(classOf[ListView[_]], listView, "elementTypeInfo") + if (elementTypeInfo != null) { +resultMap.put(field.getName, + new ListViewTypeInfo(elementTypeInfo.asInstanceOf[TypeInformation[_]])) + } +} + } +} + +resultMap + } + + + /** +* Extract data view specification. +* +* @param index aggregate function index +* @param aggFun aggregate function +* @param accType accumulator type information +* @param dataViewTypes data view fields types +* @param isUseState is use state +* @return the data view specification +*/ + def extractDataViewTypeInfo( --- End diff -- Rename method to `removeStateViewFieldsFromAccTypeInfo`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105205#comment-16105205 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130075088 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfo.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.ListSerializer +import org.apache.flink.table.api.dataview.ListView + +/** + * [[ListView]] type information. + * + * @param elementType element type information + * @tparam T element type + */ +@PublicEvolving +class ListViewTypeInfo[T](val elementType: TypeInformation[T]) + extends TypeInformation[ListView[T]] { + + override def isBasicType: Boolean = false + + override def isTupleType: Boolean = false + + override def getArity: Int = 0 --- End diff -- This must be `1`. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105204#comment-16105204 ] ASF GitHub Bot commented on FLINK-7206: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130085690 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{MapSerializer, MapSerializerConfigSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.MapView + +/** + * A serializer for [[HeapMapView]]. The serializer relies on a key serializer and a value + * serializer for the serialization of the map's key-value pairs. + * + * The serialization format for the map is as follows: four bytes for the length of the map, + * followed by the serialized representation of each key-value pair. To allow null values, + * each value is prefixed by a null marker. + * + * @param mapSerializer Map serializer. + * @tparam K The type of the keys in the map. + * @tparam V The type of the values in the map. + */ +@Internal +class MapViewSerializer[K, V](mapSerializer: MapSerializer[K, V]) + extends TypeSerializer[MapView[K, V]] { + + override def isImmutableType: Boolean = mapSerializer.isImmutableType --- End diff -- `false` > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130051854 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * //Overloaded accumulate method + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T](val elementTypeInfo: TypeInformation[T]) extends DataView { --- End diff -- `with ListState[T]` or `with AppendingState[T, JIterable[T]]`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---