[jira] [Commented] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic

2017-07-28 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106034#comment-16106034
 ] 

Aljoscha Krettek commented on FLINK-6996:
-

[~till.rohrmann] The log is not accessible. (at least for me)

> FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
> --
>
> Key: FLINK-6996
> URL: https://issues.apache.org/jira/browse/FLINK-6996
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2
>
>
> FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This 
> means, when it's used like a "regular sink function" (option a from [the java 
> doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html])
>  it will not flush the data on "snapshotState"  as it is supposed to.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-28 Thread Yueting Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106008#comment-16106008
 ] 

Yueting Chen commented on FLINK-7169:
-

Hi [~dawidwys], 
About the empty match issue, I am not sure if I get your point, could you show 
me an example if possible?

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7292) Fix EMPTY MATCH bug in CEP.

2017-07-28 Thread Yueting Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106007#comment-16106007
 ] 

Yueting Chen commented on FLINK-7292:
-

Hi [~litrain],
Thanks for bringing it up. But I don't think the result should contain an empty 
match in your example. In general, {{a?}} means we should try to find a single 
match for {{a}} first, if it fails, then an empty result should be generated. I 
think for the match pattern {{a?}} and the input events {{b1,a1}} , the result 
should be {{\[empty match],a1}}.

IMO, the only meaning that empty result exists is that it indicates how many 
match times was processed. Maybe in some use cases it is useful. 

> Fix EMPTY MATCH bug in CEP.
> ---
>
> Key: FLINK-7292
> URL: https://issues.apache.org/jira/browse/FLINK-7292
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Reporter: zhangxiaoyu
>
> Currently, with the pattern {quote}a? {quote}and the event{quote} a1{quote}, 
> the result pattern is only{quote} a1{quote}without the empty match.
> We wish the empty matched is also returned. And I am working on this issue 
> now.
> My method is  checking if there exists empty match only when the the first 
> event comes(at the StartState) ——try to traverse the PROCEED edges with the 
> trueFunction condition from the StartState, see if it can arrive FinalState, 
> if so, add an empty list to the result.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7293) Support custom order by in PatternStream

2017-07-28 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105963#comment-16105963
 ] 

Dian Fu edited comment on FLINK-7293 at 7/29/17 1:59 AM:
-

{quote}
Could you explain a bit why this is needed? 
{quote}
As we need to support clauses such as
{code}
SELECT *
FROM Ticker MATCH_RECOGNIZE (
 PARTITION BY symbol
 ORDER BY tstamp, price
 MEASURES  STRT.tstamp AS start_tstamp,
   LAST(DOWN.tstamp) AS bottom_tstamp,
   LAST(UP.tstamp) AS end_tstamp
 ONE ROW PER MATCH
 AFTER MATCH SKIP TO LAST UP
 PATTERN (STRT DOWN+ UP+)
 DEFINE
DOWN AS DOWN.price < PREV(DOWN.price),
UP AS UP.price > PREV(UP.price)
 ) MR
{code}
There may be multiple columns {{tstamp}} and {{price}} to {{order by}}.

{quote}
I can't see a way to sort an unbounded stream of data  Could you elaborate a 
bit how do you see it working?
how this is going to play well with the Time semantics.
When both event-time and a custom order-by is used, who is going to win?
{quote}
This is working in the same way as the implementation of {{sort by}} in table 
API. That's to say, both the event-time and the custom order-by will be used 
and the event-time will be considered with higher priority and the custom 
order-by will be considered with lower priority. With both event-time and a 
custom order-by are used, when events come, they will be firstly ordered by the 
event time and when watermark come, the events with the same event time before 
watermark will be firstly ordered by the custom order-by before emitted (Please 
refer to 
[DataStreamSort.scala|https://github.com/apache/flink/blob/b8c8f204de718e6d5b7c3df837deafaed7c375f5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala]
 for more details)

Thoughts?




was (Author: dian.fu):
{quote}
Could you explain a bit why this is needed? 
{quote}
As we need to support clauses such as
{code}
SELECT *
FROM Ticker MATCH_RECOGNIZE (
 PARTITION BY symbol
 ORDER BY tstamp, price
 MEASURES  STRT.tstamp AS start_tstamp,
   LAST(DOWN.tstamp) AS bottom_tstamp,
   LAST(UP.tstamp) AS end_tstamp
 ONE ROW PER MATCH
 AFTER MATCH SKIP TO LAST UP
 PATTERN (STRT DOWN+ UP+)
 DEFINE
DOWN AS DOWN.price < PREV(DOWN.price),
UP AS UP.price > PREV(UP.price)
 ) MR
{code}
There may be multiple columns to order by.

{quote}
I can't see a way to sort an unbounded stream of data  Could you elaborate a 
bit how do you see it working?
how this is going to play well with the Time semantics.
When both event-time and a custom order-by is used, who is going to win?
{quote}
This is working in the same way as the implementation of {{sort by}} in table 
API. That's to say, both the event-time and the custom order-by will be used 
and the event-time should be considered with higher priority and the custom 
order-by will be considered with lower priorities. With both event-time and a 
custom order-by are used, when events come, they will be firstly ordered by the 
event time and when watermark come, the events before watermark with the same 
event time will firstly ordered by the custom order-by before emitted (Please 
refer to 
[DataStreamSort.scala|https://github.com/apache/flink/blob/b8c8f204de718e6d5b7c3df837deafaed7c375f5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala]
 for more details)

Thoughts?



> Support custom order by in PatternStream
> 
>
> Key: FLINK-7293
> URL: https://issues.apache.org/jira/browse/FLINK-7293
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently, when {{ProcessingTime}} is configured, the events are fed to NFA 
> in the order of the arriving time and when {{EventTime}} is configured, the 
> events are fed to NFA in the order of the event time. It should also allow 
> custom {{order by}} to allow users to define the order of the events besides 
> the above factors.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7293) Support custom order by in PatternStream

2017-07-28 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105963#comment-16105963
 ] 

Dian Fu edited comment on FLINK-7293 at 7/29/17 1:30 AM:
-

{quote}
Could you explain a bit why this is needed? 
{quote}
As we need to support clauses such as
{code}
SELECT *
FROM Ticker MATCH_RECOGNIZE (
 PARTITION BY symbol
 ORDER BY tstamp, price
 MEASURES  STRT.tstamp AS start_tstamp,
   LAST(DOWN.tstamp) AS bottom_tstamp,
   LAST(UP.tstamp) AS end_tstamp
 ONE ROW PER MATCH
 AFTER MATCH SKIP TO LAST UP
 PATTERN (STRT DOWN+ UP+)
 DEFINE
DOWN AS DOWN.price < PREV(DOWN.price),
UP AS UP.price > PREV(UP.price)
 ) MR
{code}
There may be multiple columns to order by.

{quote}
I can't see a way to sort an unbounded stream of data  Could you elaborate a 
bit how do you see it working?
how this is going to play well with the Time semantics.
When both event-time and a custom order-by is used, who is going to win?
{quote}
This is working in the same way as the implementation of {{sort by}} in table 
API. That's to say, both the event-time and the custom order-by will be used 
and the event-time should be considered with higher priority and the custom 
order-by will be considered with lower priorities. With both event-time and a 
custom order-by are used, when events come, they will be firstly ordered by the 
event time and when watermark come, the events before watermark with the same 
event time will firstly ordered by the custom order-by before emitted (Please 
refer to 
[DataStreamSort.scala|https://github.com/apache/flink/blob/b8c8f204de718e6d5b7c3df837deafaed7c375f5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala]
 for more details)

Thoughts?




was (Author: dian.fu):
{quote}
Could you explain a bit why this is needed? 
{quote}
As we need to support clauses such as
{code}
SELECT *
FROM Ticker MATCH_RECOGNIZE (
 PARTITION BY symbol
 ORDER BY tstamp, price
 MEASURES  STRT.tstamp AS start_tstamp,
   LAST(DOWN.tstamp) AS bottom_tstamp,
   LAST(UP.tstamp) AS end_tstamp
 ONE ROW PER MATCH
 AFTER MATCH SKIP TO LAST UP
 PATTERN (STRT DOWN+ UP+)
 DEFINE
DOWN AS DOWN.price < PREV(DOWN.price),
UP AS UP.price > PREV(UP.price)
 ) MR
{code}
There may be multiple columns to order by.

{quote}
I can't see a way to sort an unbounded stream of data  Could you elaborate a 
bit how do you see it working?
how this is going to play well with the Time semantics.
When both event-time and a custom order-by is used, who is going to win?
{quote}
This is working in the same way as the implementation of {{sort by}} in table 
API. That's to say, both the event-time and the custom order-by will be used 
and the event-time should be considered with higher priority and the custom 
order-by will be considered with lower priorities. (Please refer to 
[DataStreamSort.scala|https://github.com/apache/flink/blob/b8c8f204de718e6d5b7c3df837deafaed7c375f5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala]
 for more details)

Thoughts?



> Support custom order by in PatternStream
> 
>
> Key: FLINK-7293
> URL: https://issues.apache.org/jira/browse/FLINK-7293
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently, when {{ProcessingTime}} is configured, the events are fed to NFA 
> in the order of the arriving time and when {{EventTime}} is configured, the 
> events are fed to NFA in the order of the event time. It should also allow 
> custom {{order by}} to allow users to define the order of the events besides 
> the above factors.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7293) Support custom order by in PatternStream

2017-07-28 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105963#comment-16105963
 ] 

Dian Fu commented on FLINK-7293:


{quote}
Could you explain a bit why this is needed? 
{quote}
As we need to support clauses such as
{code}
SELECT *
FROM Ticker MATCH_RECOGNIZE (
 PARTITION BY symbol
 ORDER BY tstamp, price
 MEASURES  STRT.tstamp AS start_tstamp,
   LAST(DOWN.tstamp) AS bottom_tstamp,
   LAST(UP.tstamp) AS end_tstamp
 ONE ROW PER MATCH
 AFTER MATCH SKIP TO LAST UP
 PATTERN (STRT DOWN+ UP+)
 DEFINE
DOWN AS DOWN.price < PREV(DOWN.price),
UP AS UP.price > PREV(UP.price)
 ) MR
{code}
There may be multiple columns to order by.

{quote}
I can't see a way to sort an unbounded stream of data  Could you elaborate a 
bit how do you see it working?
how this is going to play well with the Time semantics.
When both event-time and a custom order-by is used, who is going to win?
{quote}
This is working in the same way as the implementation of {{sort by}} in table 
API. That's to say, both the event-time and the custom order-by will be used 
and the event-time should be considered with higher priority and the custom 
order-by will be considered with lower priorities. (Please refer to 
[DataStreamSort.scala|https://github.com/apache/flink/blob/b8c8f204de718e6d5b7c3df837deafaed7c375f5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala]
 for more details)

Thoughts?



> Support custom order by in PatternStream
> 
>
> Key: FLINK-7293
> URL: https://issues.apache.org/jira/browse/FLINK-7293
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently, when {{ProcessingTime}} is configured, the events are fed to NFA 
> in the order of the arriving time and when {{EventTime}} is configured, the 
> events are fed to NFA in the order of the event time. It should also allow 
> custom {{order by}} to allow users to define the order of the events besides 
> the above factors.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7281) Fix various issues in (Maven) release infrastructure

2017-07-28 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7281.
---
Resolution: Fixed

Implemented on master in
50a818b1bb74e9442478157bb10c0d5de05ad665
f1c92f2367183d7e215466b3ff227fff11efc470

Implemented on release-1.3 in
62b67876cec0ef13e35eec242a01029899b916b4
b5c9617b41eb38c104ea5c20f15a1f937c591b40

> Fix various issues in (Maven) release infrastructure
> 
>
> Key: FLINK-7281
> URL: https://issues.apache.org/jira/browse/FLINK-7281
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.3.1, 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2
>
>
> I discovered couple of issues while getting ready for release 1.3.2:
> * some old, misleading release scripts and release README
> * the _maven-release-plugin_ is not correctly configured for doing actual 
> releases with it
> * the quickstarts are not configured to depend on the project version and 
> thus require manual updating, also of slf4j and log4j versions
> * the _maven-javadoc-plugin_ configuration does not work when using the the 
> _maven-release-plugin_, that is we have to move the config to the plugin 
> section and out of the _release_ profile



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7290) Make release scripts modular

2017-07-28 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7290.
---
Resolution: Fixed

Implemented on master in
18733d82e694997e229afb50f13c21fcc1c65729

Implemented on release-1.3 in
084c59e0ec7b0800d2612b23e702a9064fe66aac

> Make release scripts modular
> 
>
> Key: FLINK-7290
> URL: https://issues.apache.org/jira/browse/FLINK-7290
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.3.0, 1.3.1, 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0, 1.3.2
>
>
> The current release script _create_release_files.sh_ is one monolithic script 
> that creates a release branch, changes versions in POMs and documentation, 
> creates a release commit (but not a release tag), creates a source release, 
> pushes Scala 2.10 and Scala 2.11 artefacts to maven, creates binary 
> convenience releases for various Hadoop and Scala versions, stages the source 
> and binary releases for release voting.
> If anything goes wrong in the release process modification (or complete 
> start-over) of the process is required. I'm proposing to create a set of 
> modular release scripts that each perform a given action. (Actually, I would 
> like to use the _maven-release-plugin_ for that but this would require more 
> work and doesn't work well for releasing with different Scala versions). 
> I'm proposing this set of scripts:
>  * _create_release_branch.sh_: Branch of for new release, update versions in 
> POMs and doc, create release tag.
>  * _create_source_release.sh_: Selfexplanatory
>  * _deploy_stating_jars.sh_: Selfexplanatory
>  * _create_binary_release.sh_: Create a binary release for a specific version 
> or for a whole matrix of versions.
> Also, having the modular scripts allows, for example, to create the binary 
> releases (which is time consuming) on a VM somewhere, then fetching them to 
> the local machine and sign them there. I.e. this doesn't require putting a 
> private key and passphrase on a remote machine.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7281) Fix various issues in (Maven) release infrastructure

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105552#comment-16105552
 ] 

ASF GitHub Bot commented on FLINK-7281:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4407
  
@zentol Yes, I'm planning to remove the old script and the documentation 
for the new scripts is taking shape here: 
https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release

The scripts are not yet described there but I will fill this in as I go. I 
think it's already quite the improvement compared to the old release "guide": 
https://cwiki.apache.org/confluence/display/FLINK/Releasing


> Fix various issues in (Maven) release infrastructure
> 
>
> Key: FLINK-7281
> URL: https://issues.apache.org/jira/browse/FLINK-7281
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.3.1, 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2
>
>
> I discovered couple of issues while getting ready for release 1.3.2:
> * some old, misleading release scripts and release README
> * the _maven-release-plugin_ is not correctly configured for doing actual 
> releases with it
> * the quickstarts are not configured to depend on the project version and 
> thus require manual updating, also of slf4j and log4j versions
> * the _maven-javadoc-plugin_ configuration does not work when using the the 
> _maven-release-plugin_, that is we have to move the config to the plugin 
> section and out of the _release_ profile



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4407: [FLINK-7281] Fix various issues in (Maven) release infras...

2017-07-28 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4407
  
@zentol Yes, I'm planning to remove the old script and the documentation 
for the new scripts is taking shape here: 
https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release

The scripts are not yet described there but I will fill this in as I go. I 
think it's already quite the improvement compared to the old release "guide": 
https://cwiki.apache.org/confluence/display/FLINK/Releasing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7281) Fix various issues in (Maven) release infrastructure

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105550#comment-16105550
 ] 

ASF GitHub Bot commented on FLINK-7281:
---

Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/4407


> Fix various issues in (Maven) release infrastructure
> 
>
> Key: FLINK-7281
> URL: https://issues.apache.org/jira/browse/FLINK-7281
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.3.1, 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2
>
>
> I discovered couple of issues while getting ready for release 1.3.2:
> * some old, misleading release scripts and release README
> * the _maven-release-plugin_ is not correctly configured for doing actual 
> releases with it
> * the quickstarts are not configured to depend on the project version and 
> thus require manual updating, also of slf4j and log4j versions
> * the _maven-javadoc-plugin_ configuration does not work when using the the 
> _maven-release-plugin_, that is we have to move the config to the plugin 
> section and out of the _release_ profile



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7281) Fix various issues in (Maven) release infrastructure

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105549#comment-16105549
 ] 

ASF GitHub Bot commented on FLINK-7281:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4407
  
Merged


> Fix various issues in (Maven) release infrastructure
> 
>
> Key: FLINK-7281
> URL: https://issues.apache.org/jira/browse/FLINK-7281
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.3.1, 1.4.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2
>
>
> I discovered couple of issues while getting ready for release 1.3.2:
> * some old, misleading release scripts and release README
> * the _maven-release-plugin_ is not correctly configured for doing actual 
> releases with it
> * the quickstarts are not configured to depend on the project version and 
> thus require manual updating, also of slf4j and log4j versions
> * the _maven-javadoc-plugin_ configuration does not work when using the the 
> _maven-release-plugin_, that is we have to move the config to the plugin 
> section and out of the _release_ profile



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4407: [FLINK-7281] Fix various issues in (Maven) release...

2017-07-28 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/4407


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4407: [FLINK-7281] Fix various issues in (Maven) release infras...

2017-07-28 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4407
  
Merged


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4400: [FLINK-7253] [tests] Remove CommonTestUtils#assumeJava8

2017-07-28 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4400
  
Changes look good but unused imports checkstyle violation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7253) Remove all 'assume Java 8' code in tests

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105545#comment-16105545
 ] 

ASF GitHub Bot commented on FLINK-7253:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4400
  
Changes look good but unused imports checkstyle violation.


> Remove all 'assume Java 8' code in tests
> 
>
> Key: FLINK-7253
> URL: https://issues.apache.org/jira/browse/FLINK-7253
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7250) Drop the jdk8 build profile

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105543#comment-16105543
 ] 

ASF GitHub Bot commented on FLINK-7250:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4399
  
LGTM


> Drop the jdk8 build profile
> ---
>
> Key: FLINK-7250
> URL: https://issues.apache.org/jira/browse/FLINK-7250
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4399: [FLINK-7250] [build] Remove jdk8 profile

2017-07-28 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4399
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7249) Bump Java version in build plugin

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105542#comment-16105542
 ] 

ASF GitHub Bot commented on FLINK-7249:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4398
  
+1 to merge

We can also simplify setting the garbage collector in 
`flink-dist/src/main/flink-bin/bin/taskmanager.sh` and including the 
`flink-connector-elasticsearch5` module in `flink-connectors/pom.xml`.


> Bump Java version in build plugin
> -
>
> Key: FLINK-7249
> URL: https://issues.apache.org/jira/browse/FLINK-7249
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Reporter: Stephan Ewen
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4398: [FLINK-7249] [build] Bump java.version property to 1.8

2017-07-28 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4398
  
+1 to merge

We can also simplify setting the garbage collector in 
`flink-dist/src/main/flink-bin/bin/taskmanager.sh` and including the 
`flink-connector-elasticsearch5` module in `flink-connectors/pom.xml`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7092) Shutdown ResourceManager components properly

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105224#comment-16105224
 ] 

ASF GitHub Bot commented on FLINK-7092:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4289


> Shutdown ResourceManager components properly
> 
>
> Key: FLINK-7092
> URL: https://issues.apache.org/jira/browse/FLINK-7092
> Project: Flink
>  Issue Type: Sub-task
>  Components: Mesos
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> The {{MesosResourceManager}} starts internally a {{TaskMonitor}}, 
> {{LaunchCoordinator}}, {{ConnectionMonitor}} and a 
> {{ReconciliationCoordinator}}. These components have to be properly shut down 
> when the {{MesosResourceManager}} closes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7279) MiniCluster can deadlock at shut down

2017-07-28 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-7279:


Assignee: Nico Kruber

> MiniCluster can deadlock at shut down
> -
>
> Key: FLINK-7279
> URL: https://issues.apache.org/jira/browse/FLINK-7279
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Nico Kruber
>  Labels: flip-6
>
> The {{MiniCluster}} can deadlock in case if the fatal error handler is called 
> while the {{MiniCluster}} shuts down. The reason is that the shut down 
> happens under a lock which is required by the fatal error handler as well. If 
> now the {{MiniCluster}} tries to shut down the underlying RPC service which 
> waits for all actors to terminate, it will never complete because one actor 
> is still waiting for the lock.
> One solution would be to ignore the fatal error handler calls if the 
> {{MiniCluster}} is shutting down.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/257811319/log.txt



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130053061
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
--- End diff --

```
ListView provides List functionality for accumulators used by user-defined 
aggregate functions {{AggregateFunction}}. 
A ListView can be backed by a Java ArrayList or a state backend, depending 
on the context in which the function is used. 

At runtime `ListView` will be replaced by a {@link StateListView} or a 
{@link HeapListView}. 
Hence, the `ListView's` method do not need to be implemented.
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105182#comment-16105182
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130038499
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
--- End diff --

ScalaDocs are not formatted with HTML: 
http://docs.scala-lang.org/style/scaladoc.html


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130119984
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -19,13 +19,20 @@
 package org.apache.flink.table.runtime.aggregate
 
 import org.apache.flink.api.common.functions.Function
+import org.apache.flink.table.dataview.{DataViewFactory, HeapViewFactory}
 import org.apache.flink.types.Row
 
 /**
   * Base class for code-generated aggregations.
   */
 abstract class GeneratedAggregations extends Function {
 
+  var factory: DataViewFactory = new HeapViewFactory()
+
+  def getDataViewFactory: DataViewFactory = factory
+
+  def setDataViewFactory(factory: DataViewFactory): Unit = this.factory = 
factory
--- End diff --

I think we should rather add a method `initialize(ctx: RuntimeContext)` and 
generate code to register the state in this method. 

IMO, the `DataViewFactory` is also not required, because 
1. we can code-gen all of that functionality
2. we can make heap the default for `MapView` and `ListView` such that we 
only need to replace it if it needs to be backed by state. So there would be 
only one implementation of `DataViewFactory`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105196#comment-16105196
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130071629
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[HeapListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = listSerializer.isImmutableType
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = new 
HeapListView[T](listSerializer.createInstance())
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val list = from.asInstanceOf[HeapListView[T]].list
+new HeapListView[T](listSerializer.copy(list))
+  }
+
+  override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = 
copy(from)
+
+  override def getLength: Int = -1
+
+  override def serialize(record: ListView[T], target: DataOutputView): 
Unit = {
+val list = record.asInstanceOf[HeapListView[T]].list
+listSerializer.serialize(list, target)
+  }
+
+  override def deserialize(source: DataInputView): ListView[T] =
+new HeapListView[T](listSerializer.deserialize(source))
+
+  override def deserialize(reuse: ListView[T], source: DataInputView): 
ListView[T] =
+deserialize(source)
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit =
+listSerializer.copy(source, target)
+
+  override def canEqual(obj: scala.Any): Boolean = obj != null && 
obj.getClass == getClass
+
+  override def hashCode(): Int = listSerializer.hashCode()
+
+  override def equals(obj: Any): Boolean = canEqual(this) &&
+listSerializer.equals(obj.asInstanceOf[ListSerializer[_]])
--- End diff --

this should be `obj.asInstanceOf[ListViewSerializer[_]].listSerializer`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105185#comment-16105185
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130057272
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView encapsulates the operation of map.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java HashMap or state backend. It will be replaced by a 
{@link StateMapView} or a
+  * {@link HeapMapView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *//Overloaded accumulate method
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
--- End diff --

Most comments on `ListState` apply here as well.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130092445
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
+for (i <- 0 until fields.size()) {
+  val field = fields.get(i)
+  field.setAccessible(true)
+  if (classOf[DataView].isAssignableFrom(field.getType)) {
+if (field.getType == classOf[MapView[_, _]]) {
+  val mapView = field.get(acc)
+  val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, 
"keyTypeInfo")
+  val valueTypeInfo = getFieldValue(classOf[MapView[_, _]], 
mapView, "valueTypeInfo")
+  if (keyTypeInfo != null && valueTypeInfo != null) {
+  resultMap.put(field.getName, new MapViewTypeInfo(
+keyTypeInfo.asInstanceOf[TypeInformation[_]],
+valueTypeInfo.asInstanceOf[TypeInformation[_]]))
+  }
+} else if (field.getType == classOf[ListView[_]]) {
+  val listView = field.get(acc)
+  val elementTypeInfo = getFieldValue(classOf[ListView[_]], 
listView, "elementTypeInfo")
+  if (elementTypeInfo != null) {
+resultMap.put(field.getName,
+  new 
ListViewTypeInfo(elementTypeInfo.asInstanceOf[TypeInformation[_]]))
+  }
+}
+  }
+}
+
+resultMap
+  }
+
+
+  /**
+* Extract data view specification.
+*
+* @param index aggregate function index
+* @param aggFun aggregate function
+* @param accType accumulator type information
+* @param dataViewTypes data view fields types
+* @param isUseState is use state
--- End diff --

The comment is not helpful. Please add more detailed parameters descriptions



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130059458
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/HeapViewFactory.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, StateDescriptor}
+import org.apache.flink.table.api.dataview.{ListView, MapView}
+
+/**
+  * Heap view factory to create [[HeapListView]] or [[HeapMapView]].
+  *
+  */
+class HeapViewFactory() extends DataViewFactory() {
+
+  override protected def createListView[T](id: String): ListView[T] = new 
HeapListView[T]
+
+  override protected def createMapView[K, V](id: String): MapView[K, V] = 
new HeapMapView[K, V]
+}
+
+class HeapListView[T] extends ListView[T] {
--- End diff --

Could this be the default implementation of `ListView`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130059497
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/HeapViewFactory.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, StateDescriptor}
+import org.apache.flink.table.api.dataview.{ListView, MapView}
+
+/**
+  * Heap view factory to create [[HeapListView]] or [[HeapMapView]].
+  *
+  */
+class HeapViewFactory() extends DataViewFactory() {
+
+  override protected def createListView[T](id: String): ListView[T] = new 
HeapListView[T]
+
+  override protected def createMapView[K, V](id: String): MapView[K, V] = 
new HeapMapView[K, V]
+}
+
+class HeapListView[T] extends ListView[T] {
+
+  val list = new util.ArrayList[T]()
+
+  def this(t: util.List[T]) = {
+this()
+list.addAll(t)
+  }
+
+  override def get: JIterable[T] = {
+if (!list.isEmpty) {
+  list
+} else {
+  null
+}
+  }
+
+  override def add(value: T): Unit = list.add(value)
+
+  override def clear(): Unit = list.clear()
+}
+
+class HeapMapView[K, V] extends MapView[K, V] {
--- End diff --

Could this be the default implementation of `MapView`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7297) Instable Kafka09ProducerITCase.testCustomPartitioning test case

2017-07-28 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7297:


 Summary: Instable Kafka09ProducerITCase.testCustomPartitioning 
test case
 Key: FLINK-7297
 URL: https://issues.apache.org/jira/browse/FLINK-7297
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector, Tests
Affects Versions: 1.4.0
Reporter: Till Rohrmann
Priority: Critical


There seems to be a test instability of 
{{Kafka09ProducerITCase>KafkaProducerTestBase.testCustomPartitioning}} on 
Travis.

https://s3.amazonaws.com/archive.travis-ci.org/jobs/258538636/log.txt



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105186#comment-16105186
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130040193
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  /**
+* Returns an iterable of the list.
+*
+* @return The iterable of the list or { @code null} if the list is 
empty.
+*/
+  def get: JIterable[T] = throw new 
UnsupportedOperationException("Unsupported operation!")
--- End diff --

is this Java's `java.lang.UnsupportedOperationException`?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105206#comment-16105206
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130117980
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -108,30 +112,38 @@ object AggregateUtil {
   outputArity,
   needRetract = false,
   needMerge = false,
-  needReset = false
+  needReset = false,
+  accConfig = Some(DataViewConfig(accSpecs, isUseState))
 )
 
+val accConfig = accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
 if (isRowTimeType) {
   if (isRowsClause) {
 // ROWS unbounded over process function
 new RowTimeUnboundedRowsOver(
   genFunction,
   aggregationStateType,
   CRowTypeInfo(inputTypeInfo),
-  queryConfig)
+  queryConfig,
+  accConfig)
--- End diff --

I think it would be better to keep `accConfig` out of the ProcessFunctions. 
It is just passing information to the `GeneratedAggregations` which could also 
be code generated. The only thing that we need is a `RuntimeContext` in 
`GeneratedAggregations`. Therefore, I propose to add a method 
`GeneratedAggregations.initialize(ctx: RuntimeContext())` instead of adding 
`GeneratedAggregations.setDataViewFactory()`. In `initialize()` we can generate 
code that registers all necessary state by itself and keeps it as member 
variables.

I think this would be cleaner because it encapsulates everything that's 
related to aggregation functions in the code-gen'd class. 

If we use heap state in `MapView` and `ListView` as default, we also won't 
need `DataViewFactory` because we can generate all state access directly (if 
required).


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130058705
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/DataViewFactory.scala
 ---
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.table.api.dataview.{ListView, MapView}
+
+/**
+  * Factory to creaate [[ListView]] or [[MapView]].
+  *
+  */
+abstract class DataViewFactory() extends Serializable {
--- End diff --

Would we need this if we only need to replace the view if it is backed by a 
state backend?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105213#comment-16105213
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130073596
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[HeapListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = listSerializer.isImmutableType
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = new 
HeapListView[T](listSerializer.createInstance())
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val list = from.asInstanceOf[HeapListView[T]].list
+new HeapListView[T](listSerializer.copy(list))
+  }
+
+  override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = 
copy(from)
+
+  override def getLength: Int = -1
+
+  override def serialize(record: ListView[T], target: DataOutputView): 
Unit = {
+val list = record.asInstanceOf[HeapListView[T]].list
+listSerializer.serialize(list, target)
+  }
+
+  override def deserialize(source: DataInputView): ListView[T] =
+new HeapListView[T](listSerializer.deserialize(source))
+
+  override def deserialize(reuse: ListView[T], source: DataInputView): 
ListView[T] =
+deserialize(source)
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit =
+listSerializer.copy(source, target)
+
+  override def canEqual(obj: scala.Any): Boolean = obj != null && 
obj.getClass == getClass
+
+  override def hashCode(): Int = listSerializer.hashCode()
+
+  override def equals(obj: Any): Boolean = canEqual(this) &&
+listSerializer.equals(obj.asInstanceOf[ListSerializer[_]])
+
+  override def snapshotConfiguration(): TypeSerializerConfigSnapshot =
+listSerializer.snapshotConfiguration()
+
+  // copy and modified from ListSerializer.ensureCompatibility
+  override def ensureCompatibility(configSnapshot: 
TypeSerializerConfigSnapshot)
+  : CompatibilityResult[ListView[T]] = {
+configSnapshot match {
+  case snapshot: CollectionSerializerConfigSnapshot[_] =>
+val previousKvSerializersAndConfigs = 
snapshot.getNestedSerializersAndConfigs
--- End diff --

Change name to `previousListSerializerAndConfig`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter

[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105198#comment-16105198
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130119984
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -19,13 +19,20 @@
 package org.apache.flink.table.runtime.aggregate
 
 import org.apache.flink.api.common.functions.Function
+import org.apache.flink.table.dataview.{DataViewFactory, HeapViewFactory}
 import org.apache.flink.types.Row
 
 /**
   * Base class for code-generated aggregations.
   */
 abstract class GeneratedAggregations extends Function {
 
+  var factory: DataViewFactory = new HeapViewFactory()
+
+  def getDataViewFactory: DataViewFactory = factory
+
+  def setDataViewFactory(factory: DataViewFactory): Unit = this.factory = 
factory
--- End diff --

I think we should rather add a method `initialize(ctx: RuntimeContext)` and 
generate code to register the state in this method. 

IMO, the `DataViewFactory` is also not required, because 
1. we can code-gen all of that functionality
2. we can make heap the default for `MapView` and `ListView` such that we 
only need to replace it if it needs to be backed by state. So there would be 
only one implementation of `DataViewFactory`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130039762
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView encapsulates the operation of map.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java HashMap or state backend. It will be replaced by a 
{@link StateMapView} or a
+  * {@link HeapMapView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *//Overloaded accumulate method
--- End diff --

`accumulate()` is not overloaded here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130112099
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
--- End diff --

Couldn't you instead take the accumulator type information and use the 
field information?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130038328
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
--- End diff --

Shouldn't this be `ListViewTypeInfoFactory[T]`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130085778
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.common.typeutils.base.{MapSerializer, 
MapSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * A serializer for [[HeapMapView]]. The serializer relies on a key 
serializer and a value
+  * serializer for the serialization of the map's key-value pairs.
+  *
+  * The serialization format for the map is as follows: four bytes for 
the length of the map,
+  * followed by the serialized representation of each key-value pair. To 
allow null values,
+  * each value is prefixed by a null marker.
+  *
+  * @param mapSerializer Map serializer.
+  * @tparam K The type of the keys in the map.
+  * @tparam V The type of the values in the map.
+  */
+@Internal
+class MapViewSerializer[K, V](mapSerializer: MapSerializer[K, V])
--- End diff --

Add a test based on `SerializerTestBase`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105200#comment-16105200
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130115155
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
+for (i <- 0 until fields.size()) {
+  val field = fields.get(i)
+  field.setAccessible(true)
+  if (classOf[DataView].isAssignableFrom(field.getType)) {
+if (field.getType == classOf[MapView[_, _]]) {
+  val mapView = field.get(acc)
+  val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, 
"keyTypeInfo")
+  val valueTypeInfo = getFieldValue(classOf[MapView[_, _]], 
mapView, "valueTypeInfo")
+  if (keyTypeInfo != null && valueTypeInfo != null) {
+  resultMap.put(field.getName, new MapViewTypeInfo(
+keyTypeInfo.asInstanceOf[TypeInformation[_]],
+valueTypeInfo.asInstanceOf[TypeInformation[_]]))
+  }
+} else if (field.getType == classOf[ListView[_]]) {
+  val listView = field.get(acc)
+  val elementTypeInfo = getFieldValue(classOf[ListView[_]], 
listView, "elementTypeInfo")
+  if (elementTypeInfo != null) {
+resultMap.put(field.getName,
+  new 
ListViewTypeInfo(elementTypeInfo.asInstanceOf[TypeInformation[_]]))
+  }
+}
+  }
+}
+
+resultMap
+  }
+
+
+  /**
+* Extract data view specification.
+*
+* @param index aggregate function index
+* @param aggFun aggregate function
+* @param accType accumulator type information
+* @param dataViewTypes data view fields types
+* @param isUseState is use state
+* @return the data view specification
+*/
+  def extractDataViewTypeInfo(
--- End diff --

Rename method to `removeStateViewFieldsFromAccTypeInfo`?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105180#comment-16105180
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130038328
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
--- End diff --

Shouldn't this be `ListViewTypeInfoFactory[T]`?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130086721
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.MapSerializer
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * [[MapView]] type information.
+  *
+  * @param keyType key type information
+  * @param valueType value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@PublicEvolving
+class  MapViewTypeInfo[K, V](
+val keyType: TypeInformation[K],
+val valueType: TypeInformation[V])
+  extends TypeInformation[MapView[K, V]] {
+
+  @PublicEvolving
+  override def isBasicType = false
+
+  @PublicEvolving
+  override def isTupleType = false
+
+  @PublicEvolving
+  override def getArity = 0
+
+  @PublicEvolving
+  override def getTotalFields = 2
--- End diff --

should be `1`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130057272
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView encapsulates the operation of map.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java HashMap or state backend. It will be replaced by a 
{@link StateMapView} or a
+  * {@link HeapMapView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *//Overloaded accumulate method
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
--- End diff --

Most comments on `ListState` apply here as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105197#comment-16105197
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130085778
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.common.typeutils.base.{MapSerializer, 
MapSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * A serializer for [[HeapMapView]]. The serializer relies on a key 
serializer and a value
+  * serializer for the serialization of the map's key-value pairs.
+  *
+  * The serialization format for the map is as follows: four bytes for 
the length of the map,
+  * followed by the serialized representation of each key-value pair. To 
allow null values,
+  * each value is prefixed by a null marker.
+  *
+  * @param mapSerializer Map serializer.
+  * @tparam K The type of the keys in the map.
+  * @tparam V The type of the values in the map.
+  */
+@Internal
+class MapViewSerializer[K, V](mapSerializer: MapSerializer[K, V])
--- End diff --

Add a test based on `SerializerTestBase`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130115757
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -82,11 +84,13 @@ object AggregateUtil {
   isRowsClause: Boolean)
 : ProcessFunction[CRow, CRow] = {
 
-val (aggFields, aggregates, accTypes) =
+val isUseState = true
--- End diff --

rename to `isStateBackedDataViews`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130086377
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.common.typeutils.base.{MapSerializer, 
MapSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * A serializer for [[HeapMapView]]. The serializer relies on a key 
serializer and a value
+  * serializer for the serialization of the map's key-value pairs.
+  *
+  * The serialization format for the map is as follows: four bytes for 
the length of the map,
+  * followed by the serialized representation of each key-value pair. To 
allow null values,
+  * each value is prefixed by a null marker.
+  *
+  * @param mapSerializer Map serializer.
+  * @tparam K The type of the keys in the map.
+  * @tparam V The type of the values in the map.
+  */
+@Internal
+class MapViewSerializer[K, V](mapSerializer: MapSerializer[K, V])
+  extends TypeSerializer[MapView[K, V]] {
+
+  override def isImmutableType: Boolean = mapSerializer.isImmutableType
+
+  override def duplicate(): TypeSerializer[MapView[K, V]] =
+new MapViewSerializer[K, V](
+  mapSerializer.duplicate().asInstanceOf[MapSerializer[K, V]])
+
+  override def createInstance(): MapView[K, V] = {
+new HeapMapView[K, V](mapSerializer.createInstance())
+  }
+
+  override def copy(from: MapView[K, V]): MapView[K, V] = {
+val map = from.asInstanceOf[HeapMapView[K, V]].map
+new HeapMapView[K, V](mapSerializer.copy(map))
+  }
+
+  override def copy(from: MapView[K, V], reuse: MapView[K, V]): MapView[K, 
V] = copy(from)
+
+  override def getLength: Int = -1  // var length
+
+  override def serialize(record: MapView[K, V], target: DataOutputView): 
Unit = {
+val map = record.asInstanceOf[HeapMapView[K, V]].map
+mapSerializer.serialize(map, target)
+  }
+
+  override def deserialize(source: DataInputView): MapView[K, V] =
+new HeapMapView[K, V](mapSerializer.deserialize(source))
+
+  override def deserialize(reuse: MapView[K, V], source: DataInputView): 
MapView[K, V] =
+deserialize(source)
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit =
+mapSerializer.copy(source, target)
+
+  override def canEqual(obj: Any): Boolean = obj != null && obj.getClass 
== getClass
+
+  override def hashCode(): Int = mapSerializer.hashCode()
+
+  override def equals(obj: Any): Boolean = canEqual(this) &&
+mapSerializer.equals(obj.asInstanceOf[MapSerializer[_, _]])
--- End diff --

should be `obj.asInstanceOf[MapViewSerializer[_, _]].map`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130072175
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[HeapListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = listSerializer.isImmutableType
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = new 
HeapListView[T](listSerializer.createInstance())
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val list = from.asInstanceOf[HeapListView[T]].list
+new HeapListView[T](listSerializer.copy(list))
+  }
+
+  override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = 
copy(from)
+
+  override def getLength: Int = -1
+
+  override def serialize(record: ListView[T], target: DataOutputView): 
Unit = {
+val list = record.asInstanceOf[HeapListView[T]].list
+listSerializer.serialize(list, target)
+  }
+
+  override def deserialize(source: DataInputView): ListView[T] =
+new HeapListView[T](listSerializer.deserialize(source))
+
+  override def deserialize(reuse: ListView[T], source: DataInputView): 
ListView[T] =
+deserialize(source)
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit =
+listSerializer.copy(source, target)
+
+  override def canEqual(obj: scala.Any): Boolean = obj != null && 
obj.getClass == getClass
+
+  override def hashCode(): Int = listSerializer.hashCode()
+
+  override def equals(obj: Any): Boolean = canEqual(this) &&
+listSerializer.equals(obj.asInstanceOf[ListSerializer[_]])
+
+  override def snapshotConfiguration(): TypeSerializerConfigSnapshot =
+listSerializer.snapshotConfiguration()
+
+  // copy and modified from ListSerializer.ensureCompatibility
+  override def ensureCompatibility(configSnapshot: 
TypeSerializerConfigSnapshot)
--- End diff --

Please align as:

```
override def ensureCompatibility(
configSnapshot: TypeSerializerConfigSnapshot): 
CompatibilityResult[ListView[T]] = {

  configSnapshot match {
...
  }
}

```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130075088
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfo.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.ListSerializer
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * [[ListView]] type information.
+  *
+  * @param elementType element type information
+  * @tparam T element type
+  */
+@PublicEvolving
+class  ListViewTypeInfo[T](val elementType: TypeInformation[T])
+  extends TypeInformation[ListView[T]] {
+
+  override def isBasicType: Boolean = false
+
+  override def isTupleType: Boolean = false
+
+  override def getArity: Int = 0
--- End diff --

This must be `1`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130054412
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  /**
+* Returns an iterable of the list.
+*
+* @return The iterable of the list or { @code null} if the list is 
empty.
+*/
+  def get: JIterable[T] = throw new 
UnsupportedOperationException("Unsupported operation!")
+
+  /**
+* Adding the given value to the list.
+*
+* @param value element to be appended to this list
+*/
+  def add(value: T): Unit = throw new 
UnsupportedOperationException("Unsupported operation!")
--- End diff --

Just a thought. How about, we implement `ListView` by default as 
`HeapListView` and only replace it if we it needs to be state-backed. IMO, this 
has a few benefits:

- a UDAGG can be more easily tested because the accumulator does not need 
to be changed.
- it is more efficient, because we only need to touch the accumulator if it 
needs to be in the state backend.
- should be easier to implement

What do you think @kaibozhou, @wuchong?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105210#comment-16105210
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130114248
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -561,4 +679,28 @@ object UserDefinedFunctionUtils {
   }
 }
   }
+
+  /**
+* Get field value from a object.
+*
+* @param clazz class to be analyzed.
+* @param obj Object to get field value.
+* @param fieldName Field name.
+* @return Field value.
+*/
+  def getFieldValue(
--- End diff --

Do we need this method if we can get the type infos from the data views 
with a `private[flink]` method?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130039340
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
--- End diff --

`accumulate` methods are not overloaded.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105190#comment-16105190
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130086695
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.MapSerializer
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * [[MapView]] type information.
+  *
+  * @param keyType key type information
+  * @param valueType value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@PublicEvolving
+class  MapViewTypeInfo[K, V](
+val keyType: TypeInformation[K],
+val valueType: TypeInformation[V])
+  extends TypeInformation[MapView[K, V]] {
+
+  @PublicEvolving
+  override def isBasicType = false
+
+  @PublicEvolving
+  override def isTupleType = false
+
+  @PublicEvolving
+  override def getArity = 0
--- End diff --

should be `1`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105209#comment-16105209
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130086721
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.MapSerializer
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * [[MapView]] type information.
+  *
+  * @param keyType key type information
+  * @param valueType value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@PublicEvolving
+class  MapViewTypeInfo[K, V](
+val keyType: TypeInformation[K],
+val valueType: TypeInformation[V])
+  extends TypeInformation[MapView[K, V]] {
+
+  @PublicEvolving
+  override def isBasicType = false
+
+  @PublicEvolving
+  override def isTupleType = false
+
+  @PublicEvolving
+  override def getArity = 0
+
+  @PublicEvolving
+  override def getTotalFields = 2
--- End diff --

should be `1`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105194#comment-16105194
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130115757
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -82,11 +84,13 @@ object AggregateUtil {
   isRowsClause: Boolean)
 : ProcessFunction[CRow, CRow] = {
 
-val (aggFields, aggregates, accTypes) =
+val isUseState = true
--- End diff --

rename to `isStateBackedDataViews`?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7118) Remove hadoop1.x code in HadoopUtils

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105237#comment-16105237
 ] 

ASF GitHub Bot commented on FLINK-7118:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4285


> Remove hadoop1.x code in HadoopUtils
> 
>
> Key: FLINK-7118
> URL: https://issues.apache.org/jira/browse/FLINK-7118
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: mingleizhang
>Assignee: mingleizhang
> Fix For: 1.4.0
>
>
> Since flink no longer support hadoop 1.x version, we should remove it. Below 
> code reside in {{org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils}}
>   
> {code:java}
> public static JobContext instantiateJobContext(Configuration configuration, 
> JobID jobId) throws Exception {
>   try {
>   Class clazz;
>   // for Hadoop 1.xx
>   if(JobContext.class.isInterface()) {
>   clazz = 
> Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, 
> Thread.currentThread().getContextClassLoader());
>   }
>   // for Hadoop 2.xx
>   else {
>   clazz = 
> Class.forName("org.apache.hadoop.mapreduce.JobContext", true, 
> Thread.currentThread().getContextClassLoader());
>   }
>   Constructor constructor = 
> clazz.getConstructor(Configuration.class, JobID.class);
>   JobContext context = (JobContext) 
> constructor.newInstance(configuration, jobId);
>   
>   return context;
>   } catch(Exception e) {
>   throw new Exception("Could not create instance of 
> JobContext.");
>   }
>   }
> {code}
> And 
> {code:java}
>   public static TaskAttemptContext 
> instantiateTaskAttemptContext(Configuration configuration,  TaskAttemptID 
> taskAttemptID) throws Exception {
>   try {
>   Class clazz;
>   // for Hadoop 1.xx
>   if(JobContext.class.isInterface()) {
>   clazz = 
> Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
>   }
>   // for Hadoop 2.xx
>   else {
>   clazz = 
> Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext");
>   }
>   Constructor constructor = 
> clazz.getConstructor(Configuration.class, TaskAttemptID.class);
>   TaskAttemptContext context = (TaskAttemptContext) 
> constructor.newInstance(configuration, taskAttemptID);
>   
>   return context;
>   } catch(Exception e) {
>   throw new Exception("Could not create instance of 
> TaskAttemptContext.");
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7118) Remove hadoop1.x code in HadoopUtils

2017-07-28 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7118.
--
Resolution: Fixed

Fixed via 5725e63309269a02d93046df5cca724eefb25e2d

> Remove hadoop1.x code in HadoopUtils
> 
>
> Key: FLINK-7118
> URL: https://issues.apache.org/jira/browse/FLINK-7118
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: mingleizhang
>Assignee: mingleizhang
> Fix For: 1.4.0
>
>
> Since flink no longer support hadoop 1.x version, we should remove it. Below 
> code reside in {{org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils}}
>   
> {code:java}
> public static JobContext instantiateJobContext(Configuration configuration, 
> JobID jobId) throws Exception {
>   try {
>   Class clazz;
>   // for Hadoop 1.xx
>   if(JobContext.class.isInterface()) {
>   clazz = 
> Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, 
> Thread.currentThread().getContextClassLoader());
>   }
>   // for Hadoop 2.xx
>   else {
>   clazz = 
> Class.forName("org.apache.hadoop.mapreduce.JobContext", true, 
> Thread.currentThread().getContextClassLoader());
>   }
>   Constructor constructor = 
> clazz.getConstructor(Configuration.class, JobID.class);
>   JobContext context = (JobContext) 
> constructor.newInstance(configuration, jobId);
>   
>   return context;
>   } catch(Exception e) {
>   throw new Exception("Could not create instance of 
> JobContext.");
>   }
>   }
> {code}
> And 
> {code:java}
>   public static TaskAttemptContext 
> instantiateTaskAttemptContext(Configuration configuration,  TaskAttemptID 
> taskAttemptID) throws Exception {
>   try {
>   Class clazz;
>   // for Hadoop 1.xx
>   if(JobContext.class.isInterface()) {
>   clazz = 
> Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
>   }
>   // for Hadoop 2.xx
>   else {
>   clazz = 
> Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext");
>   }
>   Constructor constructor = 
> clazz.getConstructor(Configuration.class, TaskAttemptID.class);
>   TaskAttemptContext context = (TaskAttemptContext) 
> constructor.newInstance(configuration, taskAttemptID);
>   
>   return context;
>   } catch(Exception e) {
>   throw new Exception("Could not create instance of 
> TaskAttemptContext.");
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7134) Remove hadoop1.x code in mapreduce.utils.HadoopUtils

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105236#comment-16105236
 ] 

ASF GitHub Bot commented on FLINK-7134:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4362


> Remove hadoop1.x code in mapreduce.utils.HadoopUtils
> 
>
> Key: FLINK-7134
> URL: https://issues.apache.org/jira/browse/FLINK-7134
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: mingleizhang
>Assignee: mingleizhang
> Fix For: 1.4.0
>
>
> This jira is similar to FLINK-7118. And for a clearer format and a review, I 
> separated the two jira.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4362: [FLINK-7134] Remove hadoop1.x code in mapreduce.ut...

2017-07-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4362


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4285: [FLINK-7118] [hadoop] Remove hadoop1.x code in Had...

2017-07-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4285


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-7134) Remove hadoop1.x code in mapreduce.utils.HadoopUtils

2017-07-28 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7134.
--
Resolution: Fixed

Fixed via 8e367731019ee70e2f6dc1be21c80f51a2ef6a2b

> Remove hadoop1.x code in mapreduce.utils.HadoopUtils
> 
>
> Key: FLINK-7134
> URL: https://issues.apache.org/jira/browse/FLINK-7134
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: mingleizhang
>Assignee: mingleizhang
> Fix For: 1.4.0
>
>
> This jira is similar to FLINK-7118. And for a clearer format and a review, I 
> separated the two jira.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7092) Shutdown ResourceManager components properly

2017-07-28 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7092.
--
   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed via 6d3cffc17be5f4f5b586ef9fa1b2feb9204369dd

> Shutdown ResourceManager components properly
> 
>
> Key: FLINK-7092
> URL: https://issues.apache.org/jira/browse/FLINK-7092
> Project: Flink
>  Issue Type: Sub-task
>  Components: Mesos
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> The {{MesosResourceManager}} starts internally a {{TaskMonitor}}, 
> {{LaunchCoordinator}}, {{ConnectionMonitor}} and a 
> {{ReconciliationCoordinator}}. These components have to be properly shut down 
> when the {{MesosResourceManager}} closes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4289: [FLINK-7092] [mesos] Shutdown ResourceManager comp...

2017-07-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4289


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130071770
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[HeapListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = listSerializer.isImmutableType
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = new 
HeapListView[T](listSerializer.createInstance())
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val list = from.asInstanceOf[HeapListView[T]].list
+new HeapListView[T](listSerializer.copy(list))
+  }
+
+  override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = 
copy(from)
+
+  override def getLength: Int = -1
+
+  override def serialize(record: ListView[T], target: DataOutputView): 
Unit = {
+val list = record.asInstanceOf[HeapListView[T]].list
+listSerializer.serialize(list, target)
+  }
+
+  override def deserialize(source: DataInputView): ListView[T] =
+new HeapListView[T](listSerializer.deserialize(source))
+
+  override def deserialize(reuse: ListView[T], source: DataInputView): 
ListView[T] =
+deserialize(source)
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit =
+listSerializer.copy(source, target)
+
+  override def canEqual(obj: scala.Any): Boolean = obj != null && 
obj.getClass == getClass
+
+  override def hashCode(): Int = listSerializer.hashCode()
+
+  override def equals(obj: Any): Boolean = canEqual(this) &&
+listSerializer.equals(obj.asInstanceOf[ListSerializer[_]])
--- End diff --

There is `SerializerTestBase` which can be used to implement unit tests for 
`TypeSerializer`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130106082
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
--- End diff --

Please add comments to this method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7296) Validate commit messages in git pre-receive hook

2017-07-28 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105217#comment-16105217
 ] 

Greg Hogan commented on FLINK-7296:
---

[~uce] I'm trying to think through where this should run. On the client side 
committers may not have (pre-push?) hook configured on a repo so changes could 
slip through. GitHub doesn't allow server-side hooks (instead sending out 
webhooks notifications). Apache Infra may allow a server-side hook but would 
require a ticket to update (and if ever switching to GitBox would require 
committers to continue pushing to the Apache-hosted repo).

> Validate commit messages in git pre-receive hook
> 
>
> Key: FLINK-7296
> URL: https://issues.apache.org/jira/browse/FLINK-7296
> Project: Flink
>  Issue Type: Improvement
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Would like to investigate a pre-receive (server-side) hook analyzing the 
> commit message incoming revisions on the {{master}} branch for the standard 
> JIRA format ({{\[FLINK-\] \[component\] ...}} or {{\[hotfix\] 
> \[component\] ...}}).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105195#comment-16105195
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130106082
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
--- End diff --

Please add comments to this method


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105199#comment-16105199
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130074361
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[HeapListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = listSerializer.isImmutableType
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = new 
HeapListView[T](listSerializer.createInstance())
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val list = from.asInstanceOf[HeapListView[T]].list
+new HeapListView[T](listSerializer.copy(list))
+  }
+
+  override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = 
copy(from)
+
+  override def getLength: Int = -1
+
+  override def serialize(record: ListView[T], target: DataOutputView): 
Unit = {
+val list = record.asInstanceOf[HeapListView[T]].list
+listSerializer.serialize(list, target)
+  }
+
+  override def deserialize(source: DataInputView): ListView[T] =
+new HeapListView[T](listSerializer.deserialize(source))
+
+  override def deserialize(reuse: ListView[T], source: DataInputView): 
ListView[T] =
+deserialize(source)
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit =
+listSerializer.copy(source, target)
+
+  override def canEqual(obj: scala.Any): Boolean = obj != null && 
obj.getClass == getClass
+
+  override def hashCode(): Int = listSerializer.hashCode()
+
+  override def equals(obj: Any): Boolean = canEqual(this) &&
+listSerializer.equals(obj.asInstanceOf[ListSerializer[_]])
+
+  override def snapshotConfiguration(): TypeSerializerConfigSnapshot =
+listSerializer.snapshotConfiguration()
+
+  // copy and modified from ListSerializer.ensureCompatibility
+  override def ensureCompatibility(configSnapshot: 
TypeSerializerConfigSnapshot)
--- End diff --

Can this method be easier implemented by calling 
`listSerializer.ensureCompatibility` and checking if the returned 
`CompatibilityResult` requires migration. If the passed serializer is not 
`null` it is wrapped in a `ListViewSerializer`.

This won't work for `MapViewSerializer` because it has to handle two 
serializers, but for `ListViewSerializer` it should work.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>

[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105201#comment-16105201
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130071770
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[HeapListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = listSerializer.isImmutableType
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = new 
HeapListView[T](listSerializer.createInstance())
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val list = from.asInstanceOf[HeapListView[T]].list
+new HeapListView[T](listSerializer.copy(list))
+  }
+
+  override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = 
copy(from)
+
+  override def getLength: Int = -1
+
+  override def serialize(record: ListView[T], target: DataOutputView): 
Unit = {
+val list = record.asInstanceOf[HeapListView[T]].list
+listSerializer.serialize(list, target)
+  }
+
+  override def deserialize(source: DataInputView): ListView[T] =
+new HeapListView[T](listSerializer.deserialize(source))
+
+  override def deserialize(reuse: ListView[T], source: DataInputView): 
ListView[T] =
+deserialize(source)
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit =
+listSerializer.copy(source, target)
+
+  override def canEqual(obj: scala.Any): Boolean = obj != null && 
obj.getClass == getClass
+
+  override def hashCode(): Int = listSerializer.hashCode()
+
+  override def equals(obj: Any): Boolean = canEqual(this) &&
+listSerializer.equals(obj.asInstanceOf[ListSerializer[_]])
--- End diff --

There is `SerializerTestBase` which can be used to implement unit tests for 
`TypeSerializer`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105202#comment-16105202
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130113134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
+for (i <- 0 until fields.size()) {
+  val field = fields.get(i)
+  field.setAccessible(true)
+  if (classOf[DataView].isAssignableFrom(field.getType)) {
+if (field.getType == classOf[MapView[_, _]]) {
+  val mapView = field.get(acc)
+  val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, 
"keyTypeInfo")
--- End diff --

If we add `private[flink]` methods to `MapView` (and ListView) to access 
the key and value type infos we don't need to use reflection.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130071629
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[HeapListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = listSerializer.isImmutableType
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = new 
HeapListView[T](listSerializer.createInstance())
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val list = from.asInstanceOf[HeapListView[T]].list
+new HeapListView[T](listSerializer.copy(list))
+  }
+
+  override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = 
copy(from)
+
+  override def getLength: Int = -1
+
+  override def serialize(record: ListView[T], target: DataOutputView): 
Unit = {
+val list = record.asInstanceOf[HeapListView[T]].list
+listSerializer.serialize(list, target)
+  }
+
+  override def deserialize(source: DataInputView): ListView[T] =
+new HeapListView[T](listSerializer.deserialize(source))
+
+  override def deserialize(reuse: ListView[T], source: DataInputView): 
ListView[T] =
+deserialize(source)
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit =
+listSerializer.copy(source, target)
+
+  override def canEqual(obj: scala.Any): Boolean = obj != null && 
obj.getClass == getClass
+
+  override def hashCode(): Int = listSerializer.hashCode()
+
+  override def equals(obj: Any): Boolean = canEqual(this) &&
+listSerializer.equals(obj.asInstanceOf[ListSerializer[_]])
--- End diff --

this should be `obj.asInstanceOf[ListViewSerializer[_]].listSerializer`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105207#comment-16105207
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130059497
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/HeapViewFactory.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, StateDescriptor}
+import org.apache.flink.table.api.dataview.{ListView, MapView}
+
+/**
+  * Heap view factory to create [[HeapListView]] or [[HeapMapView]].
+  *
+  */
+class HeapViewFactory() extends DataViewFactory() {
+
+  override protected def createListView[T](id: String): ListView[T] = new 
HeapListView[T]
+
+  override protected def createMapView[K, V](id: String): MapView[K, V] = 
new HeapMapView[K, V]
+}
+
+class HeapListView[T] extends ListView[T] {
+
+  val list = new util.ArrayList[T]()
+
+  def this(t: util.List[T]) = {
+this()
+list.addAll(t)
+  }
+
+  override def get: JIterable[T] = {
+if (!list.isEmpty) {
+  list
+} else {
+  null
+}
+  }
+
+  override def add(value: T): Unit = list.add(value)
+
+  override def clear(): Unit = list.clear()
+}
+
+class HeapMapView[K, V] extends MapView[K, V] {
--- End diff --

Could this be the default implementation of `MapView`?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105179#comment-16105179
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130058705
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/DataViewFactory.scala
 ---
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.table.api.dataview.{ListView, MapView}
+
+/**
+  * Factory to creaate [[ListView]] or [[MapView]].
+  *
+  */
+abstract class DataViewFactory() extends Serializable {
--- End diff --

Would we need this if we only need to replace the view if it is backed by a 
state backend?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130073508
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[HeapListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = listSerializer.isImmutableType
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = new 
HeapListView[T](listSerializer.createInstance())
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val list = from.asInstanceOf[HeapListView[T]].list
+new HeapListView[T](listSerializer.copy(list))
+  }
+
+  override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = 
copy(from)
+
+  override def getLength: Int = -1
+
+  override def serialize(record: ListView[T], target: DataOutputView): 
Unit = {
+val list = record.asInstanceOf[HeapListView[T]].list
+listSerializer.serialize(list, target)
+  }
+
+  override def deserialize(source: DataInputView): ListView[T] =
+new HeapListView[T](listSerializer.deserialize(source))
+
+  override def deserialize(reuse: ListView[T], source: DataInputView): 
ListView[T] =
+deserialize(source)
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit =
+listSerializer.copy(source, target)
+
+  override def canEqual(obj: scala.Any): Boolean = obj != null && 
obj.getClass == getClass
+
+  override def hashCode(): Int = listSerializer.hashCode()
+
+  override def equals(obj: Any): Boolean = canEqual(this) &&
+listSerializer.equals(obj.asInstanceOf[ListSerializer[_]])
+
+  override def snapshotConfiguration(): TypeSerializerConfigSnapshot =
+listSerializer.snapshotConfiguration()
+
+  // copy and modified from ListSerializer.ensureCompatibility
+  override def ensureCompatibility(configSnapshot: 
TypeSerializerConfigSnapshot)
+  : CompatibilityResult[ListView[T]] = {
+configSnapshot match {
+  case snapshot: CollectionSerializerConfigSnapshot[_] =>
+val previousKvSerializersAndConfigs = 
snapshot.getNestedSerializersAndConfigs
--- End diff --

use `getSingleNestedSerializerAndConfig()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130059825
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[HeapListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = listSerializer.isImmutableType
--- End diff --

return `false`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105183#comment-16105183
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130054412
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  /**
+* Returns an iterable of the list.
+*
+* @return The iterable of the list or { @code null} if the list is 
empty.
+*/
+  def get: JIterable[T] = throw new 
UnsupportedOperationException("Unsupported operation!")
+
+  /**
+* Adding the given value to the list.
+*
+* @param value element to be appended to this list
+*/
+  def add(value: T): Unit = throw new 
UnsupportedOperationException("Unsupported operation!")
--- End diff --

Just a thought. How about, we implement `ListView` by default as 
`HeapListView` and only replace it if we it needs to be state-backed. IMO, this 
has a few benefits:

- a UDAGG can be more easily tested because the accumulator does not need 
to be changed.
- it is more efficient, because we only need to touch the accumulator if it 
needs to be in the state backend.
- should be easier to implement

What do you think @kaibozhou, @wuchong?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105178#comment-16105178
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130039340
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
--- End diff --

`accumulate` methods are not overloaded.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105208#comment-16105208
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130072175
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[HeapListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = listSerializer.isImmutableType
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = new 
HeapListView[T](listSerializer.createInstance())
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val list = from.asInstanceOf[HeapListView[T]].list
+new HeapListView[T](listSerializer.copy(list))
+  }
+
+  override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = 
copy(from)
+
+  override def getLength: Int = -1
+
+  override def serialize(record: ListView[T], target: DataOutputView): 
Unit = {
+val list = record.asInstanceOf[HeapListView[T]].list
+listSerializer.serialize(list, target)
+  }
+
+  override def deserialize(source: DataInputView): ListView[T] =
+new HeapListView[T](listSerializer.deserialize(source))
+
+  override def deserialize(reuse: ListView[T], source: DataInputView): 
ListView[T] =
+deserialize(source)
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit =
+listSerializer.copy(source, target)
+
+  override def canEqual(obj: scala.Any): Boolean = obj != null && 
obj.getClass == getClass
+
+  override def hashCode(): Int = listSerializer.hashCode()
+
+  override def equals(obj: Any): Boolean = canEqual(this) &&
+listSerializer.equals(obj.asInstanceOf[ListSerializer[_]])
+
+  override def snapshotConfiguration(): TypeSerializerConfigSnapshot =
+listSerializer.snapshotConfiguration()
+
+  // copy and modified from ListSerializer.ensureCompatibility
+  override def ensureCompatibility(configSnapshot: 
TypeSerializerConfigSnapshot)
--- End diff --

Please align as:

```
override def ensureCompatibility(
configSnapshot: TypeSerializerConfigSnapshot): 
CompatibilityResult[ListView[T]] = {

  configSnapshot match {
...
  }
}

```


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> I

[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105189#comment-16105189
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130051854
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
--- End diff --

`with ListState[T]` or `with AppendingState[T, JIterable[T]]`?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (FLINK-6996) FlinkKafkaProducer010 doesn't guarantee at-least-once semantic

2017-07-28 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reopened FLINK-6996:
--

There seems to be a test instability with 
{{Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink}}.

https://s3.amazonaws.com/archive.travis-ci.org/jobs/258538641/log.txt

> FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
> --
>
> Key: FLINK-6996
> URL: https://issues.apache.org/jira/browse/FLINK-6996
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2
>
>
> FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This 
> means, when it's used like a "regular sink function" (option a from [the java 
> doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html])
>  it will not flush the data on "snapshotState"  as it is supposed to.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130114248
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -561,4 +679,28 @@ object UserDefinedFunctionUtils {
   }
 }
   }
+
+  /**
+* Get field value from a object.
+*
+* @param clazz class to be analyzed.
+* @param obj Object to get field value.
+* @param fieldName Field name.
+* @return Field value.
+*/
+  def getFieldValue(
--- End diff --

Do we need this method if we can get the type infos from the data views 
with a `private[flink]` method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105211#comment-16105211
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130112099
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
--- End diff --

Couldn't you instead take the accumulator type information and use the 
field information?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105187#comment-16105187
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130059458
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/HeapViewFactory.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, StateDescriptor}
+import org.apache.flink.table.api.dataview.{ListView, MapView}
+
+/**
+  * Heap view factory to create [[HeapListView]] or [[HeapMapView]].
+  *
+  */
+class HeapViewFactory() extends DataViewFactory() {
+
+  override protected def createListView[T](id: String): ListView[T] = new 
HeapListView[T]
+
+  override protected def createMapView[K, V](id: String): MapView[K, V] = 
new HeapMapView[K, V]
+}
+
+class HeapListView[T] extends ListView[T] {
--- End diff --

Could this be the default implementation of `ListView`?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130074361
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[HeapListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = listSerializer.isImmutableType
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = new 
HeapListView[T](listSerializer.createInstance())
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val list = from.asInstanceOf[HeapListView[T]].list
+new HeapListView[T](listSerializer.copy(list))
+  }
+
+  override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = 
copy(from)
+
+  override def getLength: Int = -1
+
+  override def serialize(record: ListView[T], target: DataOutputView): 
Unit = {
+val list = record.asInstanceOf[HeapListView[T]].list
+listSerializer.serialize(list, target)
+  }
+
+  override def deserialize(source: DataInputView): ListView[T] =
+new HeapListView[T](listSerializer.deserialize(source))
+
+  override def deserialize(reuse: ListView[T], source: DataInputView): 
ListView[T] =
+deserialize(source)
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit =
+listSerializer.copy(source, target)
+
+  override def canEqual(obj: scala.Any): Boolean = obj != null && 
obj.getClass == getClass
+
+  override def hashCode(): Int = listSerializer.hashCode()
+
+  override def equals(obj: Any): Boolean = canEqual(this) &&
+listSerializer.equals(obj.asInstanceOf[ListSerializer[_]])
+
+  override def snapshotConfiguration(): TypeSerializerConfigSnapshot =
+listSerializer.snapshotConfiguration()
+
+  // copy and modified from ListSerializer.ensureCompatibility
+  override def ensureCompatibility(configSnapshot: 
TypeSerializerConfigSnapshot)
--- End diff --

Can this method be easier implemented by calling 
`listSerializer.ensureCompatibility` and checking if the returned 
`CompatibilityResult` requires migration. If the passed serializer is not 
`null` it is wrapped in a `ListViewSerializer`.

This won't work for `MapViewSerializer` because it has to handle two 
serializers, but for `ListViewSerializer` it should work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130040193
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  /**
+* Returns an iterable of the list.
+*
+* @return The iterable of the list or { @code null} if the list is 
empty.
+*/
+  def get: JIterable[T] = throw new 
UnsupportedOperationException("Unsupported operation!")
--- End diff --

is this Java's `java.lang.UnsupportedOperationException`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130073596
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[HeapListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = listSerializer.isImmutableType
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = new 
HeapListView[T](listSerializer.createInstance())
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val list = from.asInstanceOf[HeapListView[T]].list
+new HeapListView[T](listSerializer.copy(list))
+  }
+
+  override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = 
copy(from)
+
+  override def getLength: Int = -1
+
+  override def serialize(record: ListView[T], target: DataOutputView): 
Unit = {
+val list = record.asInstanceOf[HeapListView[T]].list
+listSerializer.serialize(list, target)
+  }
+
+  override def deserialize(source: DataInputView): ListView[T] =
+new HeapListView[T](listSerializer.deserialize(source))
+
+  override def deserialize(reuse: ListView[T], source: DataInputView): 
ListView[T] =
+deserialize(source)
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit =
+listSerializer.copy(source, target)
+
+  override def canEqual(obj: scala.Any): Boolean = obj != null && 
obj.getClass == getClass
+
+  override def hashCode(): Int = listSerializer.hashCode()
+
+  override def equals(obj: Any): Boolean = canEqual(this) &&
+listSerializer.equals(obj.asInstanceOf[ListSerializer[_]])
+
+  override def snapshotConfiguration(): TypeSerializerConfigSnapshot =
+listSerializer.snapshotConfiguration()
+
+  // copy and modified from ListSerializer.ensureCompatibility
+  override def ensureCompatibility(configSnapshot: 
TypeSerializerConfigSnapshot)
+  : CompatibilityResult[ListView[T]] = {
+configSnapshot match {
+  case snapshot: CollectionSerializerConfigSnapshot[_] =>
+val previousKvSerializersAndConfigs = 
snapshot.getNestedSerializersAndConfigs
--- End diff --

Change name to `previousListSerializerAndConfig`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105181#comment-16105181
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130053061
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
--- End diff --

```
ListView provides List functionality for accumulators used by user-defined 
aggregate functions {{AggregateFunction}}. 
A ListView can be backed by a Java ArrayList or a state backend, depending 
on the context in which the function is used. 

At runtime `ListView` will be replaced by a {@link StateListView} or a 
{@link HeapListView}. 
Hence, the `ListView's` method do not need to be implemented.
```


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130113134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
+for (i <- 0 until fields.size()) {
+  val field = fields.get(i)
+  field.setAccessible(true)
+  if (classOf[DataView].isAssignableFrom(field.getType)) {
+if (field.getType == classOf[MapView[_, _]]) {
+  val mapView = field.get(acc)
+  val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, 
"keyTypeInfo")
--- End diff --

If we add `private[flink]` methods to `MapView` (and ListView) to access 
the key and value type infos we don't need to use reflection.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130104980
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
+for (i <- 0 until fields.size()) {
+  val field = fields.get(i)
+  field.setAccessible(true)
+  if (classOf[DataView].isAssignableFrom(field.getType)) {
+if (field.getType == classOf[MapView[_, _]]) {
+  val mapView = field.get(acc)
+  val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, 
"keyTypeInfo")
+  val valueTypeInfo = getFieldValue(classOf[MapView[_, _]], 
mapView, "valueTypeInfo")
+  if (keyTypeInfo != null && valueTypeInfo != null) {
+  resultMap.put(field.getName, new MapViewTypeInfo(
+keyTypeInfo.asInstanceOf[TypeInformation[_]],
+valueTypeInfo.asInstanceOf[TypeInformation[_]]))
+  }
+} else if (field.getType == classOf[ListView[_]]) {
+  val listView = field.get(acc)
+  val elementTypeInfo = getFieldValue(classOf[ListView[_]], 
listView, "elementTypeInfo")
+  if (elementTypeInfo != null) {
+resultMap.put(field.getName,
+  new 
ListViewTypeInfo(elementTypeInfo.asInstanceOf[TypeInformation[_]]))
+  }
+}
+  }
+}
+
+resultMap
+  }
+
+
+  /**
+* Extract data view specification.
+*
+* @param index aggregate function index
+* @param aggFun aggregate function
+* @param accType accumulator type information
+* @param dataViewTypes data view fields types
+* @param isUseState is use state
+* @return the data view specification
+*/
+  def extractDataViewTypeInfo(
--- End diff --

This method should get some inline comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130085690
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.common.typeutils.base.{MapSerializer, 
MapSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * A serializer for [[HeapMapView]]. The serializer relies on a key 
serializer and a value
+  * serializer for the serialization of the map's key-value pairs.
+  *
+  * The serialization format for the map is as follows: four bytes for 
the length of the map,
+  * followed by the serialized representation of each key-value pair. To 
allow null values,
+  * each value is prefixed by a null marker.
+  *
+  * @param mapSerializer Map serializer.
+  * @tparam K The type of the keys in the map.
+  * @tparam V The type of the values in the map.
+  */
+@Internal
+class MapViewSerializer[K, V](mapSerializer: MapSerializer[K, V])
+  extends TypeSerializer[MapView[K, V]] {
+
+  override def isImmutableType: Boolean = mapSerializer.isImmutableType
--- End diff --

`false`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130126823
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,172 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
--- End diff --

I don't think we should implement `COUNT DISTINCT` as a special 
`AggregateFunction`. At least not in the long term. 

I think it would be better to handle this inside of the 
`GeneratedAggregations` and only accumulate and retract distinct values from 
user-defined aggregate functions. With this approach, any aggregation function 
can be used with `DISTINCT` and the state for distinction can also be shared 
across multiple aggregation functions. This is also the approach that has been 
started in PR #3783.

For now this is fine, but in the long run we should go for something like 
PR #3783 (which also requires the `GeneratedAggregations.initialize()` method.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130117980
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -108,30 +112,38 @@ object AggregateUtil {
   outputArity,
   needRetract = false,
   needMerge = false,
-  needReset = false
+  needReset = false,
+  accConfig = Some(DataViewConfig(accSpecs, isUseState))
 )
 
+val accConfig = accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
 if (isRowTimeType) {
   if (isRowsClause) {
 // ROWS unbounded over process function
 new RowTimeUnboundedRowsOver(
   genFunction,
   aggregationStateType,
   CRowTypeInfo(inputTypeInfo),
-  queryConfig)
+  queryConfig,
+  accConfig)
--- End diff --

I think it would be better to keep `accConfig` out of the ProcessFunctions. 
It is just passing information to the `GeneratedAggregations` which could also 
be code generated. The only thing that we need is a `RuntimeContext` in 
`GeneratedAggregations`. Therefore, I propose to add a method 
`GeneratedAggregations.initialize(ctx: RuntimeContext())` instead of adding 
`GeneratedAggregations.setDataViewFactory()`. In `initialize()` we can generate 
code that registers all necessary state by itself and keeps it as member 
variables.

I think this would be cleaner because it encapsulates everything that's 
related to aggregation functions in the code-gen'd class. 

If we use heap state in `MapView` and `ListView` as default, we also won't 
need `DataViewFactory` because we can generate all state access directly (if 
required).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105191#comment-16105191
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130073508
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[HeapListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = listSerializer.isImmutableType
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = new 
HeapListView[T](listSerializer.createInstance())
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val list = from.asInstanceOf[HeapListView[T]].list
+new HeapListView[T](listSerializer.copy(list))
+  }
+
+  override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = 
copy(from)
+
+  override def getLength: Int = -1
+
+  override def serialize(record: ListView[T], target: DataOutputView): 
Unit = {
+val list = record.asInstanceOf[HeapListView[T]].list
+listSerializer.serialize(list, target)
+  }
+
+  override def deserialize(source: DataInputView): ListView[T] =
+new HeapListView[T](listSerializer.deserialize(source))
+
+  override def deserialize(reuse: ListView[T], source: DataInputView): 
ListView[T] =
+deserialize(source)
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit =
+listSerializer.copy(source, target)
+
+  override def canEqual(obj: scala.Any): Boolean = obj != null && 
obj.getClass == getClass
+
+  override def hashCode(): Int = listSerializer.hashCode()
+
+  override def equals(obj: Any): Boolean = canEqual(this) &&
+listSerializer.equals(obj.asInstanceOf[ListSerializer[_]])
+
+  override def snapshotConfiguration(): TypeSerializerConfigSnapshot =
+listSerializer.snapshotConfiguration()
+
+  // copy and modified from ListSerializer.ensureCompatibility
+  override def ensureCompatibility(configSnapshot: 
TypeSerializerConfigSnapshot)
+  : CompatibilityResult[ListView[T]] = {
+configSnapshot match {
+  case snapshot: CollectionSerializerConfigSnapshot[_] =>
+val previousKvSerializersAndConfigs = 
snapshot.getNestedSerializersAndConfigs
--- End diff --

use `getSingleNestedSerializerAndConfig()`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaib

[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130038499
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
--- End diff --

ScalaDocs are not formatted with HTML: 
http://docs.scala-lang.org/style/scaladoc.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130086695
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.MapSerializer
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * [[MapView]] type information.
+  *
+  * @param keyType key type information
+  * @param valueType value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@PublicEvolving
+class  MapViewTypeInfo[K, V](
+val keyType: TypeInformation[K],
+val valueType: TypeInformation[V])
+  extends TypeInformation[MapView[K, V]] {
+
+  @PublicEvolving
+  override def isBasicType = false
+
+  @PublicEvolving
+  override def isTupleType = false
+
+  @PublicEvolving
+  override def getArity = 0
--- End diff --

should be `1`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105184#comment-16105184
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130039762
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView encapsulates the operation of map.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java HashMap or state backend. It will be replaced by a 
{@link StateMapView} or a
+  * {@link HeapMapView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *//Overloaded accumulate method
--- End diff --

`accumulate()` is not overloaded here.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130115155
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
+for (i <- 0 until fields.size()) {
+  val field = fields.get(i)
+  field.setAccessible(true)
+  if (classOf[DataView].isAssignableFrom(field.getType)) {
+if (field.getType == classOf[MapView[_, _]]) {
+  val mapView = field.get(acc)
+  val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, 
"keyTypeInfo")
+  val valueTypeInfo = getFieldValue(classOf[MapView[_, _]], 
mapView, "valueTypeInfo")
+  if (keyTypeInfo != null && valueTypeInfo != null) {
+  resultMap.put(field.getName, new MapViewTypeInfo(
+keyTypeInfo.asInstanceOf[TypeInformation[_]],
+valueTypeInfo.asInstanceOf[TypeInformation[_]]))
+  }
+} else if (field.getType == classOf[ListView[_]]) {
+  val listView = field.get(acc)
+  val elementTypeInfo = getFieldValue(classOf[ListView[_]], 
listView, "elementTypeInfo")
+  if (elementTypeInfo != null) {
+resultMap.put(field.getName,
+  new 
ListViewTypeInfo(elementTypeInfo.asInstanceOf[TypeInformation[_]]))
+  }
+}
+  }
+}
+
+resultMap
+  }
+
+
+  /**
+* Extract data view specification.
+*
+* @param index aggregate function index
+* @param aggFun aggregate function
+* @param accType accumulator type information
+* @param dataViewTypes data view fields types
+* @param isUseState is use state
+* @return the data view specification
+*/
+  def extractDataViewTypeInfo(
--- End diff --

Rename method to `removeStateViewFieldsFromAccTypeInfo`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105205#comment-16105205
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130075088
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfo.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.ListSerializer
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * [[ListView]] type information.
+  *
+  * @param elementType element type information
+  * @tparam T element type
+  */
+@PublicEvolving
+class  ListViewTypeInfo[T](val elementType: TypeInformation[T])
+  extends TypeInformation[ListView[T]] {
+
+  override def isBasicType: Boolean = false
+
+  override def isTupleType: Boolean = false
+
+  override def getArity: Int = 0
--- End diff --

This must be `1`.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105204#comment-16105204
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130085690
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.common.typeutils.base.{MapSerializer, 
MapSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * A serializer for [[HeapMapView]]. The serializer relies on a key 
serializer and a value
+  * serializer for the serialization of the map's key-value pairs.
+  *
+  * The serialization format for the map is as follows: four bytes for 
the length of the map,
+  * followed by the serialized representation of each key-value pair. To 
allow null values,
+  * each value is prefixed by a null marker.
+  *
+  * @param mapSerializer Map serializer.
+  * @tparam K The type of the keys in the map.
+  * @tparam V The type of the values in the map.
+  */
+@Internal
+class MapViewSerializer[K, V](mapSerializer: MapSerializer[K, V])
+  extends TypeSerializer[MapView[K, V]] {
+
+  override def isImmutableType: Boolean = mapSerializer.isImmutableType
--- End diff --

`false`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130051854
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
--- End diff --

`with ListState[T]` or `with AppendingState[T, JIterable[T]]`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   >