[jira] [Created] (FLINK-7481) Binary search with integer overflow possibility

2017-08-20 Thread Baihua Su (JIRA)
Baihua Su created FLINK-7481:


 Summary: Binary search with integer overflow possibility
 Key: FLINK-7481
 URL: https://issues.apache.org/jira/browse/FLINK-7481
 Project: Flink
  Issue Type: Bug
Reporter: Baihua Su






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4567: [FLINK-7481] Fix binary search to avoid integer ov...

2017-08-20 Thread sugarbyheart
GitHub user sugarbyheart opened a pull request:

https://github.com/apache/flink/pull/4567

[FLINK-7481] Fix binary search to avoid integer overflow



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sugarbyheart/flink sugar-master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4567.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4567


commit 8151d80a06aa149fe1b4ccd3794956077c1d65ac
Author: Tom 
Date:   2017-08-20T07:18:37Z

Fix binary search to avoid integer overflow




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7481) Binary search with integer overflow possibility

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134325#comment-16134325
 ] 

ASF GitHub Bot commented on FLINK-7481:
---

GitHub user sugarbyheart opened a pull request:

https://github.com/apache/flink/pull/4567

[FLINK-7481] Fix binary search to avoid integer overflow



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sugarbyheart/flink sugar-master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4567.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4567


commit 8151d80a06aa149fe1b4ccd3794956077c1d65ac
Author: Tom 
Date:   2017-08-20T07:18:37Z

Fix binary search to avoid integer overflow




> Binary search with integer overflow possibility
> ---
>
> Key: FLINK-7481
> URL: https://issues.apache.org/jira/browse/FLINK-7481
> Project: Flink
>  Issue Type: Bug
>Reporter: Baihua Su
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4566: [FLINK-7477] [FLINK-7480] Various improvements to Flink s...

2017-08-20 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/4566
  
+1 to merge (assuming `command` is available on all operating systems / or 
is a bash command?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7477) Use "hadoop classpath" to augment classpath when available

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134351#comment-16134351
 ] 

ASF GitHub Bot commented on FLINK-7477:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/4566
  
+1 to merge (assuming `command` is available on all operating systems / or 
is a bash command?)


> Use "hadoop classpath" to augment classpath when available
> --
>
> Key: FLINK-7477
> URL: https://issues.apache.org/jira/browse/FLINK-7477
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Currently, some cloud environments don't properly put the Hadoop jars into 
> {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should 
> check in {{config.sh}} if the {{hadoop}} binary is on the path and augment 
> our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in 
> our scripts.
> This will improve the out-of-box experience of users that otherwise have to 
> manually set {{HADOOP_CLASSPATH}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7481) Binary search with integer overflow possibility

2017-08-20 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7481:
--
Component/s: Core

> Binary search with integer overflow possibility
> ---
>
> Key: FLINK-7481
> URL: https://issues.apache.org/jira/browse/FLINK-7481
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: Baihua Su
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7461) Remove Backwards compatibility for Flink 1.1 from Flink 1.4

2017-08-20 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7461:
--
Component/s: State Backends, Checkpointing

> Remove Backwards compatibility for Flink 1.1 from Flink 1.4
> ---
>
> Key: FLINK-7461
> URL: https://issues.apache.org/jira/browse/FLINK-7461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> This issue tracks the removal of Flink 1.1 backwards compatibility from Flink 
> 1.4. This step is helpful for further developments because it will remove 
> many old code paths and special cases. In particular, we can drop all 
> handling for non-partitionable state, i.e. state that was created with the 
> old {{Checkpointed}} interface.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7386) Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client

2017-08-20 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7386:
--
Component/s: ElasticSearch Connector

> Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ 
> client
> 
>
> Key: FLINK-7386
> URL: https://issues.apache.org/jira/browse/FLINK-7386
> Project: Flink
>  Issue Type: Improvement
>  Components: ElasticSearch Connector
>Reporter: Dawid Wysakowicz
>
> In Elasticsearch 5.2.0 client the class {{BulkProcessor}} was refactored and 
> has no longer the method {{add(ActionRequest)}}.
> For more info see: https://github.com/elastic/elasticsearch/pull/20109



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7413) Release Hadoop 2.8.x convenience binaries for 1.3.x

2017-08-20 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7413:
--
Component/s: Build System

> Release Hadoop 2.8.x convenience binaries for 1.3.x 
> 
>
> Key: FLINK-7413
> URL: https://issues.apache.org/jira/browse/FLINK-7413
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.3.2
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.3
>
>
> At least one user on the mailing lists had an issue because Hadoop 2.8.x 
> binaries are not available: 
> https://lists.apache.org/thread.html/c8badc66778144d9d6c3ee5cb23dd732a66cb6690c6867f47f4bd456@%3Cuser.flink.apache.org%3E
> It should be as easy as adding Hadoop 2.8.x to the list of created binaries 
> in the release files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7392) Enable more predicate push-down in joins

2017-08-20 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7392:
--
Component/s: Table API & SQL

> Enable more predicate push-down in joins
> 
>
> Key: FLINK-7392
> URL: https://issues.apache.org/jira/browse/FLINK-7392
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> This is a follow-up of FLINK-6429.
> As a quick workaround to prevent pushing down projections for time 
> indicators, FLINK-6429 reverts the behavior of {{ProjectJoinTransposeRule}} 
> back to the one in Calcite 1.12.
> As [~jark] suggested in FLINK-6429, we can selectively disable the push down 
> for time indicators in {{ProjectJoinTransposeRule}}. This jira tracks the 
> effort of implement the suggestion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7344) Migrate usage of joda-time to the Java 8 DateTime API

2017-08-20 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7344:
--
Component/s: Core

> Migrate usage of joda-time to the Java 8 DateTime API
> -
>
> Key: FLINK-7344
> URL: https://issues.apache.org/jira/browse/FLINK-7344
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> As the minimum Java version of Flink has been upgraded to 1.8, it is a good 
> time to migrate all usage of the joda-time package to the native Java 8 
> DateTime API.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7386) Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client

2017-08-20 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134353#comment-16134353
 ] 

Robert Metzger commented on FLINK-7386:
---

I suspect we need to add a new maven module for ES52 then

[~tzulitai] what's your take on that?

> Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ 
> client
> 
>
> Key: FLINK-7386
> URL: https://issues.apache.org/jira/browse/FLINK-7386
> Project: Flink
>  Issue Type: Improvement
>  Components: ElasticSearch Connector
>Reporter: Dawid Wysakowicz
>
> In Elasticsearch 5.2.0 client the class {{BulkProcessor}} was refactored and 
> has no longer the method {{add(ActionRequest)}}.
> For more info see: https://github.com/elastic/elasticsearch/pull/20109



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7345) Add BRANCH PATTERN.

2017-08-20 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7345:
--
Component/s: CEP

> Add BRANCH PATTERN.
> ---
>
> Key: FLINK-7345
> URL: https://issues.apache.org/jira/browse/FLINK-7345
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Reporter: zhangxiaoyu
>
> Try to support branch pattern.
> The details are at 
> https://docs.google.com/document/d/1YNjOYF7BagM4agx_TI6hQkmraVLT9ZpKhBsgHDGi-U8/edit#



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7448) Keep the data type unchanged when register an existing field as rowtime

2017-08-20 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7448:
--
Component/s: Table API & SQL

> Keep the data type unchanged when register an existing field as rowtime
> ---
>
> Key: FLINK-7448
> URL: https://issues.apache.org/jira/browse/FLINK-7448
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> Currently, when we convert a DataStream to Table, we can register an existing 
> field as rowtime field. But the rowtime field will become 
> {{TimeIndicator(Timestamp)}} type, not {{TimeIndicator(Long)}} type. It makes 
> user confused that the result becomes Timestamp type not long when select the 
> rowtime field.
> {code}
> // stream schema: [Long, Int, Double, Float], take the first field as rowtime
> val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float)
> table.printSchema()
> // - TimeIndicator(Timestamp), Int, Double, Float
> table.select('rowtime)
> // "1970-01-01 00:00:00.007"
> {code}
> Such as in the test {{TimeAttributesITCase#testCalcMaterialization2}}, the 
> original rowtime field is long type, but the selected result becomes 
> timestamp type. Rowtime attribute shouldn't change
> the type, otherwise, the result of batch and stream is not the same.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7466) Add a flink connector for Apache RocketMQ

2017-08-20 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7466:
--
Component/s: Streaming Connectors

> Add a flink connector for Apache RocketMQ
> -
>
> Key: FLINK-7466
> URL: https://issues.apache.org/jira/browse/FLINK-7466
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: yukon
>
> Hi Flink community:
> Flink is really a great stream processing framework, I would like to 
> contribute a flink-rocketmq-connector, if you think it's acceptable, I will 
> submit a pull request soon.
> Apache RocketMQ is a distributed messaging and streaming platform with low 
> latency, high performance and reliability, trillion-level capacity and 
> flexible scalability. More info please refer to http://rocketmq.apache.org/



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7296) Validate commit messages in git pre-receive hook

2017-08-20 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7296:
--
Component/s: Build System

> Validate commit messages in git pre-receive hook
> 
>
> Key: FLINK-7296
> URL: https://issues.apache.org/jira/browse/FLINK-7296
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> Would like to investigate a pre-receive (server-side) hook analyzing the 
> commit message incoming revisions on the {{master}} branch for the standard 
> JIRA format ({{\[FLINK-\] \[component\] ...}} or {{\[hotfix\] 
> \[component\] ...}}).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7402) Ineffective null check in NettyMessage#write()

2017-08-20 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7402:
--
Component/s: Network

> Ineffective null check in NettyMessage#write()
> --
>
> Key: FLINK-7402
> URL: https://issues.apache.org/jira/browse/FLINK-7402
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the null check in finally block:
> {code}
>   finally {
> if (buffer != null) {
>   buffer.recycle();
> }
> {code}
> But buffer has been dereferenced in the try block without guard.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7478) Update documentation for sql insert and api change in TableAPI & SQL

2017-08-20 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7478:
--
Component/s: Table API & SQL

> Update documentation for sql insert and api change in TableAPI & SQL
> 
>
> Key: FLINK-7478
> URL: https://issues.apache.org/jira/browse/FLINK-7478
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7433) Lack of synchronization accessing inProgress in JobCancellationWithSavepointHandlers

2017-08-20 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7433:
--
Component/s: State Backends, Checkpointing

> Lack of synchronization accessing inProgress in 
> JobCancellationWithSavepointHandlers
> 
>
> Key: FLINK-7433
> URL: https://issues.apache.org/jira/browse/FLINK-7433
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   try {
> if (throwable != null) {
>   completed.put(requestId, throwable);
> } else {
>   completed.put(requestId, path);
> }
>   } finally {
> inProgress.remove(jobId);
>   }
> {code}
> The call to inProgress.remove(jobId) should be protected by lock object.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7438) Some classes are eclipsed by classes in package scala

2017-08-20 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-7438:
--
Component/s: DataStream API
 Build System

> Some classes are eclipsed by classes in package scala
> -
>
> Key: FLINK-7438
> URL: https://issues.apache.org/jira/browse/FLINK-7438
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, DataStream API
>Reporter: Ted Yu
>Priority: Minor
>
> Noticed the following during compilation:
> {code}
> [WARNING] 
> /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33:
>  warning: imported `OutputTag' is permanently  hidden by definition of 
> object OutputTag in package scala
> [WARNING] import org.apache.flink.util.{Collector, OutputTag}
> [WARNING]  ^
> [WARNING] 
> /flink/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala:33:
>  warning: imported `OutputTag' is permanently  hidden by definition of 
> class OutputTag in package scala
> [WARNING] import org.apache.flink.util.{Collector, OutputTag}
> {code}
> We should avoid the warning e.r.t. OutputTag.
> There may be other occurrences of similar warning.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7481) Binary search with integer overflow possibility

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134362#comment-16134362
 ] 

ASF GitHub Bot commented on FLINK-7481:
---

Github user sugarbyheart closed the pull request at:

https://github.com/apache/flink/pull/4567


> Binary search with integer overflow possibility
> ---
>
> Key: FLINK-7481
> URL: https://issues.apache.org/jira/browse/FLINK-7481
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: Baihua Su
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4567: [FLINK-7481] Fix binary search to avoid integer ov...

2017-08-20 Thread sugarbyheart
Github user sugarbyheart closed the pull request at:

https://github.com/apache/flink/pull/4567


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-20 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134448#comment-16134448
 ] 

Jark Wu commented on FLINK-7446:


>From my perspective, I think TableSource should provide an interface to define 
>timestamp and watermark at the same time. The {{DefinedRowtimeAttribute}} is 
>not a good design. A TableSource is not come from a DataStream, it is 
>converted into DataStream, so that the timestamp and watermark should be 
>defined by the TableSource not the DataStream. 

"generate timestamps and watermarks and validate that field and extracted 
timestamp are the same" is the validation happens at runtime? If it is, I 
prefer the later one. 

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118203
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
+for (i <- 0 until fields.size()) {
+  val field = fields.get(i)
+  field.setAccessible(true)
+  if (classOf[DataView].isAssignableFrom(field.getType)) {
+if (field.getType == classOf[MapView[_, _]]) {
+  val mapView = field.get(acc)
+  val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, 
"keyTypeInfo")
--- End diff --

Add `private[flink]` to key and value type infos will make them public in 
Java. I'm not sure whether it is a good idea. I would like to not expose them 
to users (Java users), and the reflection only happens in compile which is fine 
I think. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134467#comment-16134467
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118203
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
+for (i <- 0 until fields.size()) {
+  val field = fields.get(i)
+  field.setAccessible(true)
+  if (classOf[DataView].isAssignableFrom(field.getType)) {
+if (field.getType == classOf[MapView[_, _]]) {
+  val mapView = field.get(acc)
+  val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, 
"keyTypeInfo")
--- End diff --

Add `private[flink]` to key and value type infos will make them public in 
Java. I'm not sure whether it is a good idea. I would like to not expose them 
to users (Java users), and the reflection only happens in compile which is fine 
I think. 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118240
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -108,30 +112,38 @@ object AggregateUtil {
   outputArity,
   needRetract = false,
   needMerge = false,
-  needReset = false
+  needReset = false,
+  accConfig = Some(DataViewConfig(accSpecs, isUseState))
 )
 
+val accConfig = accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
 if (isRowTimeType) {
   if (isRowsClause) {
 // ROWS unbounded over process function
 new RowTimeUnboundedRowsOver(
   genFunction,
   aggregationStateType,
   CRowTypeInfo(inputTypeInfo),
-  queryConfig)
+  queryConfig,
+  accConfig)
--- End diff --

+1 to do this. A great improvement! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134468#comment-16134468
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118240
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -108,30 +112,38 @@ object AggregateUtil {
   outputArity,
   needRetract = false,
   needMerge = false,
-  needReset = false
+  needReset = false,
+  accConfig = Some(DataViewConfig(accSpecs, isUseState))
 )
 
+val accConfig = accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
 if (isRowTimeType) {
   if (isRowsClause) {
 // ROWS unbounded over process function
 new RowTimeUnboundedRowsOver(
   genFunction,
   aggregationStateType,
   CRowTypeInfo(inputTypeInfo),
-  queryConfig)
+  queryConfig,
+  accConfig)
--- End diff --

+1 to do this. A great improvement! 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7269) Refactor passing of dynamic properties

2017-08-20 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7269.

   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed via bd70a00019f2c9fc01653d0229308635529aad73

> Refactor passing of dynamic properties
> --
>
> Key: FLINK-7269
> URL: https://issues.apache.org/jira/browse/FLINK-7269
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Affects Versions: 1.3.1
>Reporter: Till Rohrmann
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> In order to set dynamic properties when loading the {{Configuration}} via 
> {{GlobalConfiguration.loadConfiguration}}, we currently set a static field in 
> {{GlobalConfiguration}} which is read whenever we load the {{Configuration}}.
> I think this is not a good pattern I propose to remove this functionality. 
> Instead we should explicitly add the dynamic properties to the loaded 
> {{Configuration}} at start of the application.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4415: [FLINK-7269] Refactor passing of dynamic propertie...

2017-08-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4415


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7269) Refactor passing of dynamic properties

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134511#comment-16134511
 ] 

ASF GitHub Bot commented on FLINK-7269:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4415


> Refactor passing of dynamic properties
> --
>
> Key: FLINK-7269
> URL: https://issues.apache.org/jira/browse/FLINK-7269
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Affects Versions: 1.3.1
>Reporter: Till Rohrmann
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> In order to set dynamic properties when loading the {{Configuration}} via 
> {{GlobalConfiguration.loadConfiguration}}, we currently set a static field in 
> {{GlobalConfiguration}} which is read whenever we load the {{Configuration}}.
> I think this is not a good pattern I propose to remove this functionality. 
> Instead we should explicitly add the dynamic properties to the loaded 
> {{Configuration}} at start of the application.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #3356: [FLINK-5253] Remove special treatment of "dynamic propert...

2017-08-20 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3356
  
@tillrohrmann Is this issue still valid after FLINK-7269? (#4415)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5253) Remove special treatment of "dynamic properties"

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134521#comment-16134521
 ] 

ASF GitHub Bot commented on FLINK-5253:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3356
  
@tillrohrmann Is this issue still valid after FLINK-7269? (#4415)


> Remove special treatment of "dynamic properties"
> 
>
> Key: FLINK-5253
> URL: https://issues.apache.org/jira/browse/FLINK-5253
> Project: Flink
>  Issue Type: Sub-task
>  Components: YARN
> Environment: {{flip-6}} feature branch
>Reporter: Stephan Ewen
>  Labels: flip-6
>
> The YARN client accepts configuration keys as command line parameters.
> Currently these are send to the AppMaster and TaskManager as "dynamic 
> properties", encoded in a special way via environment variables.
> The mechanism is quite fragile. We should simplify it:
>   - The YARN client takes the local {{flink-conf.yaml}} as the base.
>   - It overwrite config entries with command line properties when preparing 
> the configuration to be shipped to YARN container processes (JM / TM)
>   - No additional handling neccessary



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4566: [FLINK-7477] [FLINK-7480] Various improvements to Flink s...

2017-08-20 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4566
  
It's a bash command, AFAIK.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7477) Use "hadoop classpath" to augment classpath when available

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134547#comment-16134547
 ] 

ASF GitHub Bot commented on FLINK-7477:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4566
  
It's a bash command, AFAIK.


> Use "hadoop classpath" to augment classpath when available
> --
>
> Key: FLINK-7477
> URL: https://issues.apache.org/jira/browse/FLINK-7477
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Currently, some cloud environments don't properly put the Hadoop jars into 
> {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should 
> check in {{config.sh}} if the {{hadoop}} binary is on the path and augment 
> our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in 
> our scripts.
> This will improve the out-of-box experience of users that otherwise have to 
> manually set {{HADOOP_CLASSPATH}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-6835) Document the checkstyle requirements

2017-08-20 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-6835.
---
Resolution: Fixed

13fc61c2e850c3ca23904f2e7eba2091f2b83f75

> Document the checkstyle requirements
> 
>
> Key: FLINK-6835
> URL: https://issues.apache.org/jira/browse/FLINK-6835
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>
> We should document the checkstyle requirements somewhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-4831) Implement a slf4j metric reporter

2017-08-20 Thread Sumit Sarin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sumit Sarin reassigned FLINK-4831:
--

Assignee: Sumit Sarin

> Implement a slf4j metric reporter
> -
>
> Key: FLINK-4831
> URL: https://issues.apache.org/jira/browse/FLINK-4831
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.2
>Reporter: Chesnay Schepler
>Assignee: Sumit Sarin
>Priority: Minor
>  Labels: easyfix, starter
>
> For debugging purpose it would be very useful to have a log4j metric 
> reporter. If you don't want to setup a metric backend you currently have to 
> rely on JMX, which a) works a bit differently than other reporters (for 
> example it doesn't extend AbstractReporter) and b) makes it a bit tricky to 
> analyze results as metrics are cleaned up once a job finishes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4415: [FLINK-7269] Refactor passing of dynamic properties

2017-08-20 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4415
  
@tillrohrmann @zentol the whole reason for the static field is to allow the 
various sites that call GlobalConfiguration.loadConfiguration to obtain the 
dynamic properties.Any property could be overridden.  Since there are many, 
many such sites, this patch causes a potential regression.  

Is there a driving rationale for this aside from a general dislike for 
statics?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7269) Refactor passing of dynamic properties

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134553#comment-16134553
 ] 

ASF GitHub Bot commented on FLINK-7269:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4415
  
@tillrohrmann @zentol the whole reason for the static field is to allow the 
various sites that call GlobalConfiguration.loadConfiguration to obtain the 
dynamic properties.Any property could be overridden.  Since there are many, 
many such sites, this patch causes a potential regression.  

Is there a driving rationale for this aside from a general dislike for 
statics?


> Refactor passing of dynamic properties
> --
>
> Key: FLINK-7269
> URL: https://issues.apache.org/jira/browse/FLINK-7269
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Affects Versions: 1.3.1
>Reporter: Till Rohrmann
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> In order to set dynamic properties when loading the {{Configuration}} via 
> {{GlobalConfiguration.loadConfiguration}}, we currently set a static field in 
> {{GlobalConfiguration}} which is read whenever we load the {{Configuration}}.
> I think this is not a good pattern I propose to remove this functionality. 
> Instead we should explicitly add the dynamic properties to the loaded 
> {{Configuration}} at start of the application.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7269) Refactor passing of dynamic properties

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134555#comment-16134555
 ] 

ASF GitHub Bot commented on FLINK-7269:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4415
  
There is only one true fix, that is to propagate the configuration instance 
everywhere so that only the entrypoint calls loadConfiguration.


> Refactor passing of dynamic properties
> --
>
> Key: FLINK-7269
> URL: https://issues.apache.org/jira/browse/FLINK-7269
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Affects Versions: 1.3.1
>Reporter: Till Rohrmann
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> In order to set dynamic properties when loading the {{Configuration}} via 
> {{GlobalConfiguration.loadConfiguration}}, we currently set a static field in 
> {{GlobalConfiguration}} which is read whenever we load the {{Configuration}}.
> I think this is not a good pattern I propose to remove this functionality. 
> Instead we should explicitly add the dynamic properties to the loaded 
> {{Configuration}} at start of the application.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4415: [FLINK-7269] Refactor passing of dynamic properties

2017-08-20 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4415
  
There is only one true fix, that is to propagate the configuration instance 
everywhere so that only the entrypoint calls loadConfiguration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7482) StringWriter to support compression

2017-08-20 Thread Felix Cheung (JIRA)
Felix Cheung created FLINK-7482:
---

 Summary: StringWriter to support compression
 Key: FLINK-7482
 URL: https://issues.apache.org/jira/browse/FLINK-7482
 Project: Flink
  Issue Type: Bug
  Components: filesystem-connector
Affects Versions: 1.3.2
Reporter: Felix Cheung


Is it possible to have StringWriter support compression like 
AvroKeyValueSinkWriter or SequenceFileWriter?




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-20 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r134142624
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -108,11 +139,63 @@ public BlobCache(
this.numFetchRetries = 0;
}
 
+   // Initializing the clean up task
+   this.cleanupTimer = new Timer(true);
+
+   cleanupInterval = 
blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
+   this.cleanupTimer.schedule(this, cleanupInterval, 
cleanupInterval);
+
// Add shutdown hook to delete storage directory
shutdownHook = BlobUtils.addShutdownHook(this, LOG);
}
 
/**
+* Registers use of job-related BLOBs.
+* 
+* Using any other method to access BLOBs, e.g. {@link #getFile}, is 
only valid within calls
+* to {@link #registerJob(JobID)} and {@link #releaseJob(JobID)}.
+*
+* @param jobId
+*  ID of the job this blob belongs to
+*
+* @see #releaseJob(JobID)
+*/
+   public void registerJob(JobID jobId) {
+   synchronized (jobRefCounters) {
+   RefCount ref = jobRefCounters.get(jobId);
+   if (ref == null) {
+   ref = new RefCount();
+   jobRefCounters.put(jobId, ref);
+   }
+   ++ref.references;
+   }
+   }
+
+   /**
+* Unregisters use of job-related BLOBs and allow them to be released.
+*
+* @param jobId
+*  ID of the job this blob belongs to
+*
+* @see #registerJob(JobID)
+*/
+   public void releaseJob(JobID jobId) {
+   synchronized (jobRefCounters) {
+   RefCount ref = jobRefCounters.get(jobId);
+
+   if (ref == null) {
+   LOG.warn("improper use of releaseJob() without 
a matching number of registerJob() calls");
--- End diff --

Including jobId would help troubleshooting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-20 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r134142721
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -108,11 +139,63 @@ public BlobCache(
this.numFetchRetries = 0;
}
 
+   // Initializing the clean up task
+   this.cleanupTimer = new Timer(true);
+
+   cleanupInterval = 
blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
+   this.cleanupTimer.schedule(this, cleanupInterval, 
cleanupInterval);
+
// Add shutdown hook to delete storage directory
shutdownHook = BlobUtils.addShutdownHook(this, LOG);
}
 
/**
+* Registers use of job-related BLOBs.
+* 
+* Using any other method to access BLOBs, e.g. {@link #getFile}, is 
only valid within calls
+* to {@link #registerJob(JobID)} and {@link #releaseJob(JobID)}.
+*
+* @param jobId
+*  ID of the job this blob belongs to
+*
+* @see #releaseJob(JobID)
+*/
+   public void registerJob(JobID jobId) {
+   synchronized (jobRefCounters) {
+   RefCount ref = jobRefCounters.get(jobId);
+   if (ref == null) {
+   ref = new RefCount();
+   jobRefCounters.put(jobId, ref);
+   }
+   ++ref.references;
--- End diff --

Should keepUntil be modified (in case the code at line 193 runs) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134687#comment-16134687
 ] 

ASF GitHub Bot commented on FLINK-7057:
---

Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r134142721
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -108,11 +139,63 @@ public BlobCache(
this.numFetchRetries = 0;
}
 
+   // Initializing the clean up task
+   this.cleanupTimer = new Timer(true);
+
+   cleanupInterval = 
blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
+   this.cleanupTimer.schedule(this, cleanupInterval, 
cleanupInterval);
+
// Add shutdown hook to delete storage directory
shutdownHook = BlobUtils.addShutdownHook(this, LOG);
}
 
/**
+* Registers use of job-related BLOBs.
+* 
+* Using any other method to access BLOBs, e.g. {@link #getFile}, is 
only valid within calls
+* to {@link #registerJob(JobID)} and {@link #releaseJob(JobID)}.
+*
+* @param jobId
+*  ID of the job this blob belongs to
+*
+* @see #releaseJob(JobID)
+*/
+   public void registerJob(JobID jobId) {
+   synchronized (jobRefCounters) {
+   RefCount ref = jobRefCounters.get(jobId);
+   if (ref == null) {
+   ref = new RefCount();
+   jobRefCounters.put(jobId, ref);
+   }
+   ++ref.references;
--- End diff --

Should keepUntil be modified (in case the code at line 193 runs) ?


> move BLOB ref-counting from LibraryCacheManager to BlobCache
> 
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
> Fix For: 1.4.0
>
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR 
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for 
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} 
> level but rather per job. Therefore, the cleanup process should be adapted, 
> too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134688#comment-16134688
 ] 

ASF GitHub Bot commented on FLINK-7057:
---

Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r134142624
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -108,11 +139,63 @@ public BlobCache(
this.numFetchRetries = 0;
}
 
+   // Initializing the clean up task
+   this.cleanupTimer = new Timer(true);
+
+   cleanupInterval = 
blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
+   this.cleanupTimer.schedule(this, cleanupInterval, 
cleanupInterval);
+
// Add shutdown hook to delete storage directory
shutdownHook = BlobUtils.addShutdownHook(this, LOG);
}
 
/**
+* Registers use of job-related BLOBs.
+* 
+* Using any other method to access BLOBs, e.g. {@link #getFile}, is 
only valid within calls
+* to {@link #registerJob(JobID)} and {@link #releaseJob(JobID)}.
+*
+* @param jobId
+*  ID of the job this blob belongs to
+*
+* @see #releaseJob(JobID)
+*/
+   public void registerJob(JobID jobId) {
+   synchronized (jobRefCounters) {
+   RefCount ref = jobRefCounters.get(jobId);
+   if (ref == null) {
+   ref = new RefCount();
+   jobRefCounters.put(jobId, ref);
+   }
+   ++ref.references;
+   }
+   }
+
+   /**
+* Unregisters use of job-related BLOBs and allow them to be released.
+*
+* @param jobId
+*  ID of the job this blob belongs to
+*
+* @see #registerJob(JobID)
+*/
+   public void releaseJob(JobID jobId) {
+   synchronized (jobRefCounters) {
+   RefCount ref = jobRefCounters.get(jobId);
+
+   if (ref == null) {
+   LOG.warn("improper use of releaseJob() without 
a matching number of registerJob() calls");
--- End diff --

Including jobId would help troubleshooting.


> move BLOB ref-counting from LibraryCacheManager to BlobCache
> 
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
> Fix For: 1.4.0
>
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR 
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for 
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} 
> level but rather per job. Therefore, the cleanup process should be adapted, 
> too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118744
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
+   private[flink] val keyTypeInfo: TypeInformation[K],
+   private[flink] val valueTypeInfo: TypeInformation[V])
+  extends DataView {
+
+  def this() = this(null, null)
+
+  val map = new util.HashMap[K, V]()
--- End diff --

please make the `map` private


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118851
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
--- End diff --

Please make the elementTypeInfo as `@transient`, and do we want the type 
info to 
 be accessed by users? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134119014
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -162,13 +172,66 @@ class AggregationCodeGenerator(
   }
 }
 
+def genDataViewFieldSetter(accTerm: String, specs: 
Seq[DataViewSpec[_]]): String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val setters = for (spec <- specs) yield {
+  val field = spec.field
+  val dataViewTerm = s"${accTerm}_${field.getName}_dataview"
--- End diff --

I find that the dataview term is defined in many place, can we create a 
method to generate the term name? such as `createDataViewTerm(index: Int, 
fieldName: String)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118759
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
+   private[flink] val keyTypeInfo: TypeInformation[K],
+   private[flink] val valueTypeInfo: TypeInformation[V])
+  extends DataView {
+
+  def this() = this(null, null)
+
+  val map = new util.HashMap[K, V]()
+
+  /**
+* Returns the value to which the specified key is mapped, or { @code 
null } if this map
+* contains no mapping for the key.
+*
+* @param key The key of the mapping.
+* @return The value of the mapping with the given key.
+* @throws Exception Thrown if the system cannot get data.
+*/
+  @throws[Exception]
+  def get(key: K): V = map.get(key)
+
+  /**
+* Put a value with the given key into the map.
+*
+* @param key   The key of the mapping.
+* @param value The new value of the mapping.
+* @throws Exception Thrown if the system cannot put data.
+*/
+  @throws[Exception]
+  def put(key: K, value: V): Unit = map.put(key, value)
+
+  /**
+* Copies all of the mappings from the specified map to this map view.
+*
+* @param map The mappings to be stored in this map.
+* @throws Exception Thrown if the system cannot access the map.
+*/
+  @throws[Exception]
+  def putAll(map: util.Map[K, V]): Unit = map.putAll(map)
+
+  /**
+* Deletes the mapping of the given key.
+*
+* @param key The key of the mapping.
+* @throws Exception Thrown if the system cannot access the map.
+*/
+  @throws[Exception]
+  def remove(key: K): Unit = map.remove(key)
+
+  /**
+* Returns whether there exists the given mapping.
+*
+* @param key The key of the mapping.
+* @return True

[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134137525
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
+  indices: Range,
+  ctxTerm: String,
+  accConfig: Option[DataViewConfig])
+: String = {
+if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+  val initDataViews = new StringBuilder
+  val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.get.accSpecs
+.flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+.toMap[String, StateDescriptor[_, _]]
+
+  for (i <- indices) yield {
+for (spec <- accConfig.get.accSpecs(i)) yield {
+  val dataViewField = spec.field
+  val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+  val desc = descMapping.getOrElse(spec.id,
+throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+  val serializedData = AggregateUtil.serialize(desc)
+  val dataViewFieldTerm = 
s"acc${i}_${dataViewField.getName}_dataview"
+  val field =
+s"""
+   |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+   |""".stripMargin
+  reusableMemberStatements.add(field)
+
+  val descFieldTerm = s"${dataViewFieldTerm}_desc"
+  val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+  val descDeserialize =
+s"""
+   |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+   |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+   |  .deserialize("$serializedData");
+ """.stripMargin
+
+  val init = if (dataViewField.getType == classOf[MapView[_, _]]) {
+s"""
+   |$descDeserialize
+   |$dataViewFieldTerm =
+   |  
org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm,
--- End diff --

I think we do not need the `StateViewUtils` here, we can create a MapView 
using code gen directly, because we already have the RuntimeContext and 
StateDescriptor. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118408
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * when use state backend..
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  val list = new util.ArrayList[T]()
--- End diff --

make the list private, other wise Java users can access it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134151559
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * when use state backend..
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  val list = new util.ArrayList[T]()
+
+  /**
+* Returns an iterable of the list.
+*
+* @return The iterable of the list or { @code null} if the list is 
empty.
+*/
+  def get: JIterable[T] = {
+if (!list.isEmpty) {
+  list
+} else {
+  null
+}
+  }
+
+  /**
+* Adding the given value to the list.
+*
+* @param value element to be appended to this list
+*/
+  def add(value: T): Unit = list.add(value)
+
+  /**
+* Removes all of the elements from this list.
+*
+* The list will be empty after this call returns.
+*/
+  override def clear(): Unit = list.clear()
+
+  /**
+* Copy from a list instance.
+*
+* @param t List instance.
+* @return A copy of this list instance
+*/
+  def copyFrom(t: util.List[T]): ListView[T] = {
--- End diff --

the `copyFrom` method can be accessed by users. I should avoid this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134139167
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
--- End diff --

Do we need this ?  It only used to add `RuntimeContext` member area, but 
`RuntimeContext` is only used in `initialize`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118959
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -162,13 +172,66 @@ class AggregationCodeGenerator(
   }
 }
 
+def genDataViewFieldSetter(accTerm: String, specs: 
Seq[DataViewSpec[_]]): String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val setters = for (spec <- specs) yield {
+  val field = spec.field
+  val dataViewTerm = s"${accTerm}_${field.getName}_dataview"
+  val fieldSetter = if (Modifier.isPublic(field.getModifiers)) {
+s"$accTerm.${field.getName} = $dataViewTerm;"
+  } else {
+val fieldTerm = 
addReusablePrivateFieldAccess(field.getDeclaringClass, field.getName)
+s"${reflectiveFieldWriteAccess(fieldTerm, field, accTerm, 
dataViewTerm)};"
+  }
+
+  s"""
+ |$fieldSetter
+""".stripMargin
--- End diff --

indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134152736
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
--- End diff --

Yes, I think we can use accumulator type info instead of field information.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134138469
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
--- End diff --

The `addReusableDataViewConfig` should be in `AggregateCodeGenerator`. And 
I would like to change this method to `addReusableDataView(spec: DataViewSpec): 
String`, the returned String is the dataview member variable term. And the 
dataview creation code can be added into `reusableDataViewStatements`. And the 
code of `reusableDataViewStatements` can be added to `initialize(ctx)` (or 
`open(ctx)` as I suggested) at last.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118741
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
+   private[flink] val keyTypeInfo: TypeInformation[K],
+   private[flink] val valueTypeInfo: TypeInformation[V])
--- End diff --

I'm not sure whether it is good to add `private[flink]`,  because it is 
`public` for Java users actually.

And please make them `@transient`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118886
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -105,6 +108,13 @@ class AggregationCodeGenerator(
   inFields => for (f <- inFields) yield javaClasses(f)
 }
 
+// define runtimeContext as member variable
+val ctxTerm = s"runtimeContext"
--- End diff --

I think we do not need to make the runtimeContext as a member variable. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134775#comment-16134775
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118886
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -105,6 +108,13 @@ class AggregationCodeGenerator(
   inFields => for (f <- inFields) yield javaClasses(f)
 }
 
+// define runtimeContext as member variable
+val ctxTerm = s"runtimeContext"
--- End diff --

I think we do not need to make the runtimeContext as a member variable. 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134785#comment-16134785
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134153349
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -80,7 +82,8 @@ class AggregationCodeGenerator(
 outputArity: Int,
 needRetract: Boolean,
 needMerge: Boolean,
-needReset: Boolean)
+needReset: Boolean,
+accConfig: Option[DataViewConfig])
--- End diff --

It seems that `accConfig` is always `Some(x)`, do we need the `Option`?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134779#comment-16134779
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134151559
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * when use state backend..
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  val list = new util.ArrayList[T]()
+
+  /**
+* Returns an iterable of the list.
+*
+* @return The iterable of the list or { @code null} if the list is 
empty.
+*/
+  def get: JIterable[T] = {
+if (!list.isEmpty) {
+  list
+} else {
+  null
+}
+  }
+
+  /**
+* Adding the given value to the list.
+*
+* @param value element to be appended to this list
+*/
+  def add(value: T): Unit = list.add(value)
+
+  /**
+* Removes all of the elements from this list.
+*
+* The list will be empty after this call returns.
+*/
+  override def clear(): Unit = list.clear()
+
+  /**
+* Copy from a list instance.
+*
+* @param t List instance.
+* @return A copy of this list instance
+*/
+  def copyFrom(t: util.List[T]): ListView[T] = {
--- End diff --

the `copyFrom` method can be accessed by users. I should avoid this.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134137708
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
+  indices: Range,
+  ctxTerm: String,
+  accConfig: Option[DataViewConfig])
+: String = {
+if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+  val initDataViews = new StringBuilder
+  val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.get.accSpecs
+.flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+.toMap[String, StateDescriptor[_, _]]
+
+  for (i <- indices) yield {
+for (spec <- accConfig.get.accSpecs(i)) yield {
+  val dataViewField = spec.field
+  val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+  val desc = descMapping.getOrElse(spec.id,
+throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+  val serializedData = AggregateUtil.serialize(desc)
+  val dataViewFieldTerm = 
s"acc${i}_${dataViewField.getName}_dataview"
+  val field =
+s"""
+   |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+   |""".stripMargin
+  reusableMemberStatements.add(field)
+
+  val descFieldTerm = s"${dataViewFieldTerm}_desc"
+  val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+  val descDeserialize =
+s"""
+   |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+   |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+   |  .deserialize("$serializedData");
+ """.stripMargin
+
+  val init = if (dataViewField.getType == classOf[MapView[_, _]]) {
+s"""
+   |$descDeserialize
+   |$dataViewFieldTerm =
+   |  
org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm,
+   |  $ctxTerm);
+   """.stripMargin
+  } else if (dataViewField.getType == classOf[ListView[_]]) {
+s"""
+   |$descDeserialize
+   |$dataViewFieldTerm =
+   |  
org.apache.flink.table.dataview.StateViewUtils.createListView($descFieldTerm,
--- End diff --

Same as above, we can code gen the creation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134784#comment-16134784
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118959
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -162,13 +172,66 @@ class AggregationCodeGenerator(
   }
 }
 
+def genDataViewFieldSetter(accTerm: String, specs: 
Seq[DataViewSpec[_]]): String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val setters = for (spec <- specs) yield {
+  val field = spec.field
+  val dataViewTerm = s"${accTerm}_${field.getName}_dataview"
+  val fieldSetter = if (Modifier.isPublic(field.getModifiers)) {
+s"$accTerm.${field.getName} = $dataViewTerm;"
+  } else {
+val fieldTerm = 
addReusablePrivateFieldAccess(field.getDeclaringClass, field.getName)
+s"${reflectiveFieldWriteAccess(fieldTerm, field, accTerm, 
dataViewTerm)};"
+  }
+
+  s"""
+ |$fieldSetter
+""".stripMargin
--- End diff --

indent


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134151688
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/StateViewUtils.scala
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.dataview
+
+import java.util
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.api.common.state._
+import org.apache.flink.table.api.dataview.{ListView, MapView}
+
+/**
+  * State view utils to create [[StateListView]] or [[StateMapView]]..
+  */
+object StateViewUtils {
--- End diff --

We may not need this, as we can code generate the creation code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134770#comment-16134770
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118744
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
+   private[flink] val keyTypeInfo: TypeInformation[K],
+   private[flink] val valueTypeInfo: TypeInformation[V])
+  extends DataView {
+
+  def this() = this(null, null)
+
+  val map = new util.HashMap[K, V]()
--- End diff --

please make the `map` private


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134771#comment-16134771
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134119014
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -162,13 +172,66 @@ class AggregationCodeGenerator(
   }
 }
 
+def genDataViewFieldSetter(accTerm: String, specs: 
Seq[DataViewSpec[_]]): String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val setters = for (spec <- specs) yield {
+  val field = spec.field
+  val dataViewTerm = s"${accTerm}_${field.getName}_dataview"
--- End diff --

I find that the dataview term is defined in many place, can we create a 
method to generate the term name? such as `createDataViewTerm(index: Int, 
fieldName: String)`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134782#comment-16134782
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134139167
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
--- End diff --

Do we need this ?  It only used to add `RuntimeContext` member area, but 
`RuntimeContext` is only used in `initialize`.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134134801
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -100,6 +107,11 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 */
   def resetAccumulator(accumulators: Row)
+
+  /**
+* Clean up for the accumulators.
+*/
+  def cleanUp()
--- End diff --

cleanup is also a word, so we do not need a upper case `U` here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134153349
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -80,7 +82,8 @@ class AggregationCodeGenerator(
 outputArity: Int,
 needRetract: Boolean,
 needMerge: Boolean,
-needReset: Boolean)
+needReset: Boolean,
+accConfig: Option[DataViewConfig])
--- End diff --

It seems that `accConfig` is always `Some(x)`, do we need the `Option`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134135181
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -162,13 +172,66 @@ class AggregationCodeGenerator(
   }
 }
 
+def genDataViewFieldSetter(accTerm: String, specs: 
Seq[DataViewSpec[_]]): String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val setters = for (spec <- specs) yield {
+  val field = spec.field
+  val dataViewTerm = s"${accTerm}_${field.getName}_dataview"
+  val fieldSetter = if (Modifier.isPublic(field.getModifiers)) {
+s"$accTerm.${field.getName} = $dataViewTerm;"
+  } else {
+val fieldTerm = 
addReusablePrivateFieldAccess(field.getDeclaringClass, field.getName)
+s"${reflectiveFieldWriteAccess(fieldTerm, field, accTerm, 
dataViewTerm)};"
+  }
+
+  s"""
+ |$fieldSetter
+""".stripMargin
+}
+setters.mkString("\n")
+  } else {
+""
+  }
+}
+
+def genCleanUpDataView: String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val cleanUpDataViews = new StringBuilder
+for (i <- aggs.indices) yield {
+  val setters = for (spec <- accConfig.get.accSpecs(i)) yield {
+val dataViewTerm = s"acc${i}_${spec.field.getName}_dataview"
+val cleanUp =
+  s"""
+|$dataViewTerm.clear();
+  """.stripMargin
+cleanUpDataViews.append(cleanUp)
+  }
+}
+
+cleanUpDataViews.toString()
+  } else {
+""
+  }
+}
+
+def genInitialize: String = {
+
+j"""
+   |  public final void initialize(
--- End diff --

I would like to rename the method name to `open(ctx)`. So that we can use 
the `reusableOpenStatements` and `reuseOpenCode()` of `CodeGenerator` to 
generate the content of `open`.  Currently, the `genInitialize` is somewhat 
ambiguous to `reuseInitCode()`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134778#comment-16134778
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134137708
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
+  indices: Range,
+  ctxTerm: String,
+  accConfig: Option[DataViewConfig])
+: String = {
+if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+  val initDataViews = new StringBuilder
+  val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.get.accSpecs
+.flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+.toMap[String, StateDescriptor[_, _]]
+
+  for (i <- indices) yield {
+for (spec <- accConfig.get.accSpecs(i)) yield {
+  val dataViewField = spec.field
+  val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+  val desc = descMapping.getOrElse(spec.id,
+throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+  val serializedData = AggregateUtil.serialize(desc)
+  val dataViewFieldTerm = 
s"acc${i}_${dataViewField.getName}_dataview"
+  val field =
+s"""
+   |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+   |""".stripMargin
+  reusableMemberStatements.add(field)
+
+  val descFieldTerm = s"${dataViewFieldTerm}_desc"
+  val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+  val descDeserialize =
+s"""
+   |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+   |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+   |  .deserialize("$serializedData");
+ """.stripMargin
+
+  val init = if (dataViewField.getType == classOf[MapView[_, _]]) {
+s"""
+   |$descDeserialize
+   |$dataViewFieldTerm =
+   |  
org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm,
+   |  $ctxTerm);
+   """.stripMargin
+  } else if (dataViewField.getType == classOf[ListView[_]]) {
+s"""
+   |$descDeserialize
+   |$dataViewFieldTerm =
+   |  
org.apache.flink.table.dataview.StateViewUtils.createListView($descFieldTerm,
--- End diff --

Same as above, we can code gen the creation.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134774#comment-16134774
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134134801
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -100,6 +107,11 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 */
   def resetAccumulator(accumulators: Row)
+
+  /**
+* Clean up for the accumulators.
+*/
+  def cleanUp()
--- End diff --

cleanup is also a word, so we do not need a upper case `U` here. 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134783#comment-16134783
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134135181
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -162,13 +172,66 @@ class AggregationCodeGenerator(
   }
 }
 
+def genDataViewFieldSetter(accTerm: String, specs: 
Seq[DataViewSpec[_]]): String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val setters = for (spec <- specs) yield {
+  val field = spec.field
+  val dataViewTerm = s"${accTerm}_${field.getName}_dataview"
+  val fieldSetter = if (Modifier.isPublic(field.getModifiers)) {
+s"$accTerm.${field.getName} = $dataViewTerm;"
+  } else {
+val fieldTerm = 
addReusablePrivateFieldAccess(field.getDeclaringClass, field.getName)
+s"${reflectiveFieldWriteAccess(fieldTerm, field, accTerm, 
dataViewTerm)};"
+  }
+
+  s"""
+ |$fieldSetter
+""".stripMargin
+}
+setters.mkString("\n")
+  } else {
+""
+  }
+}
+
+def genCleanUpDataView: String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val cleanUpDataViews = new StringBuilder
+for (i <- aggs.indices) yield {
+  val setters = for (spec <- accConfig.get.accSpecs(i)) yield {
+val dataViewTerm = s"acc${i}_${spec.field.getName}_dataview"
+val cleanUp =
+  s"""
+|$dataViewTerm.clear();
+  """.stripMargin
+cleanUpDataViews.append(cleanUp)
+  }
+}
+
+cleanUpDataViews.toString()
+  } else {
+""
+  }
+}
+
+def genInitialize: String = {
+
+j"""
+   |  public final void initialize(
--- End diff --

I would like to rename the method name to `open(ctx)`. So that we can use 
the `reusableOpenStatements` and `reuseOpenCode()` of `CodeGenerator` to 
generate the content of `open`.  Currently, the `genInitialize` is somewhat 
ambiguous to `reuseInitCode()`. 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134777#comment-16134777
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118759
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
+   private[flink] val keyTypeInfo: TypeInformation[K],
+   private[flink] val valueTypeInfo: TypeInformation[V])
+  extends DataView {
+
+  def this() = this(null, null)
+
+  val map = new util.HashMap[K, V]()
+
+  /**
+* Returns the value to which the specified key is mapped, or { @code 
null } if this map
+* contains no mapping for the key.
+*
+* @param key The key of the mapping.
+* @return The value of the mapping with the given key.
+* @throws Exception Thrown if the system cannot get data.
+*/
+  @throws[Exception]
+  def get(key: K): V = map.get(key)
+
+  /**
+* Put a value with the given key into the map.
+*
+* @param key   The key of the mapping.
+* @param value The new value of the mapping.
+* @throws Exception Thrown if the system cannot put data.
+*/
+  @throws[Exception]
+  def put(key: K, value: V): Unit = map.put(key, value)
+
+  /**
+* Copies all of the mappings from the specified map to this map view.
+*
+* @param map The mappings to be stored in this map.
+* @throws Exception Thrown if the system cannot access the map.
+*/
+  @throws[Exception]
+  def putAll(map: util.Map[K, V]): Unit = map.putAll(map)
+
+  /**
+* Deletes the mapping of the given key.
+*
+* @param key The key of the mapping.
+* @throws Exception Thrown if the system cannot acce

[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134786#comment-16134786
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134138469
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
--- End diff --

The `addReusableDataViewConfig` should be in `AggregateCodeGenerator`. And 
I would like to change this method to `addReusableDataView(spec: DataViewSpec): 
String`, the returned String is the dataview member variable term. And the 
dataview creation code can be added into `reusableDataViewStatements`. And the 
code of `reusableDataViewStatements` can be added to `initialize(ctx)` (or 
`open(ctx)` as I suggested) at last.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134780#comment-16134780
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134151688
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/StateViewUtils.scala
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.dataview
+
+import java.util
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.api.common.state._
+import org.apache.flink.table.api.dataview.{ListView, MapView}
+
+/**
+  * State view utils to create [[StateListView]] or [[StateMapView]]..
+  */
+object StateViewUtils {
--- End diff --

We may not need this, as we can code generate the creation code.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134781#comment-16134781
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134152736
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
--- End diff --

Yes, I think we can use accumulator type info instead of field information.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134776#comment-16134776
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134137525
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
+  indices: Range,
+  ctxTerm: String,
+  accConfig: Option[DataViewConfig])
+: String = {
+if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+  val initDataViews = new StringBuilder
+  val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.get.accSpecs
+.flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+.toMap[String, StateDescriptor[_, _]]
+
+  for (i <- indices) yield {
+for (spec <- accConfig.get.accSpecs(i)) yield {
+  val dataViewField = spec.field
+  val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+  val desc = descMapping.getOrElse(spec.id,
+throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+  val serializedData = AggregateUtil.serialize(desc)
+  val dataViewFieldTerm = 
s"acc${i}_${dataViewField.getName}_dataview"
+  val field =
+s"""
+   |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+   |""".stripMargin
+  reusableMemberStatements.add(field)
+
+  val descFieldTerm = s"${dataViewFieldTerm}_desc"
+  val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+  val descDeserialize =
+s"""
+   |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+   |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+   |  .deserialize("$serializedData");
+ """.stripMargin
+
+  val init = if (dataViewField.getType == classOf[MapView[_, _]]) {
+s"""
+   |$descDeserialize
+   |$dataViewFieldTerm =
+   |  
org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm,
--- End diff --

I think we do not need the `StateViewUtils` here, we can create a MapView 
using code gen directly, because we already have the RuntimeContext and 
StateDescriptor. 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134773#comment-16134773
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118741
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
+   private[flink] val keyTypeInfo: TypeInformation[K],
+   private[flink] val valueTypeInfo: TypeInformation[V])
--- End diff --

I'm not sure whether it is good to add `private[flink]`,  because it is 
`public` for Java users actually.

And please make them `@transient`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134769#comment-16134769
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118851
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
--- End diff --

Please make the elementTypeInfo as `@transient`, and do we want the type 
info to 
 be accessed by users? 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134772#comment-16134772
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118408
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * when use state backend..
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  val list = new util.ArrayList[T]()
--- End diff --

make the list private, other wise Java users can access it. 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)