[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141208#comment-16141208
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135184770
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
+
+   @Override
+   public CountDistinctAccum createAccumulator() {
+   CountDistinctAccum accum = new CountDistinctAccum();
+   accum.map = new MapView<>(Types.STRING, Types.INT);
+   accum.count = 0L;
+   return accum;
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, String 
id) {
+   try {
+   if (!accumulator.map.contains(id)) {
+   accumulator.map.put(id, 1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, long id) 
{
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   accumulator.map.put(String.valueOf(id), 
1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   @Override
+   public Long getValue(CountDistinctAccum accumulator) {
+   return accumulator.count;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge.
+*/
+   public static class CountDistinctWithMerge extends CountDistinct {
+
+   //Overloaded merge method
+   public void merge(CountDistinctAccum acc, 
Iterable it) {
+   Iterator iter = it.iterator();
+   while (iter.hasNext()) {
+   CountDistinctAccum mergeAcc = iter.next();
+   acc.count += mergeAcc.count;
+
+   try {
+   Iterator mapItr = 
mergeAcc.map.keys().iterator();
+   while (mapItr.hasNext()) {
+   String key = mapItr.next();
+   if (!acc.map.contains(key)) {
+   acc.map.put(key, 1);
+   }
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge and reset.
+*/
+   public static class CountDistinctWithMergeAndReset extends 
CountDistinctWithMerge {
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {
+   acc.map.clear();
+   acc.count = 0;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with retract.
+*/
+   public static class CountDistinctWithRetractAndReset extends 
CountDistinct {
+
+   //Overloaded retract method
+   public void retract(CountDistinctAccum accumulator, long id) {
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   
accumulator.map.remove(String.valueOf(id));
--- End diff --

The code here is the opposite. It should be:
```
if 

[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135184770
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
+
+   @Override
+   public CountDistinctAccum createAccumulator() {
+   CountDistinctAccum accum = new CountDistinctAccum();
+   accum.map = new MapView<>(Types.STRING, Types.INT);
+   accum.count = 0L;
+   return accum;
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, String 
id) {
+   try {
+   if (!accumulator.map.contains(id)) {
+   accumulator.map.put(id, 1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, long id) 
{
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   accumulator.map.put(String.valueOf(id), 
1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   @Override
+   public Long getValue(CountDistinctAccum accumulator) {
+   return accumulator.count;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge.
+*/
+   public static class CountDistinctWithMerge extends CountDistinct {
+
+   //Overloaded merge method
+   public void merge(CountDistinctAccum acc, 
Iterable it) {
+   Iterator iter = it.iterator();
+   while (iter.hasNext()) {
+   CountDistinctAccum mergeAcc = iter.next();
+   acc.count += mergeAcc.count;
+
+   try {
+   Iterator mapItr = 
mergeAcc.map.keys().iterator();
+   while (mapItr.hasNext()) {
+   String key = mapItr.next();
+   if (!acc.map.contains(key)) {
+   acc.map.put(key, 1);
+   }
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge and reset.
+*/
+   public static class CountDistinctWithMergeAndReset extends 
CountDistinctWithMerge {
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {
+   acc.map.clear();
+   acc.count = 0;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with retract.
+*/
+   public static class CountDistinctWithRetractAndReset extends 
CountDistinct {
+
+   //Overloaded retract method
+   public void retract(CountDistinctAccum accumulator, long id) {
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   
accumulator.map.remove(String.valueOf(id));
--- End diff --

The code here is the opposite. It should be:
```
if 
(accumulator.map.contains(String.valueOf(id))) {
accumulator.count -= 1;

accumulator.map.remove(String.valueOf(id));
}
```

 

[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135181433
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -100,6 +108,17 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 */
   def resetAccumulator(accumulators: Row)
+
+  /**
+* Cleanup for the accumulators.
+*/
+  def cleanup()
+
+  /**
+* Tear-down method for 
[[org.apache.flink.table.functions.AggregateFunction]].
+* It can be used for clean up work. By default, this method does 
nothing.
+*/
+  def close()
--- End diff --

The close() method is corresponding to open method. And it will be called 
when open was called before, e.g. ProcTimeUnboundedOver's close() method. 
Otherwise, the udagg's close will not be called. 

I modify the case testGroupAggregateWithStateBackend to test the close was 
called.

what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141143#comment-16141143
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135181433
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -100,6 +108,17 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 */
   def resetAccumulator(accumulators: Row)
+
+  /**
+* Cleanup for the accumulators.
+*/
+  def cleanup()
+
+  /**
+* Tear-down method for 
[[org.apache.flink.table.functions.AggregateFunction]].
+* It can be used for clean up work. By default, this method does 
nothing.
+*/
+  def close()
--- End diff --

The close() method is corresponding to open method. And it will be called 
when open was called before, e.g. ProcTimeUnboundedOver's close() method. 
Otherwise, the udagg's close will not be called. 

I modify the case testGroupAggregateWithStateBackend to test the close was 
called.

what do you think?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141140#comment-16141140
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4355
  
The new created method `open(ctx)` and `close()` of `GeneratedAggregations` 
 are not called by `AggregateAggFunction` which is used in window aggregate.  I 
suggest to call the `open(ctx)` method but pass a `RuntimeContext` which throw 
exceptions in every method to tell users `User Defined AggregateFunction is not 
supported to call open() and close() in window`. But this can be addressed in 
another issue. 

BTW, I think the `AggregateCodeGenerator#generateAggregations` is too long 
with 500+ LOC. I would like to refactor it if you have no objection @kaibozhou 
@fhueske .  I have created 
[FLINK-7509](https://issues.apache.org/jira/browse/FLINK-7509) .


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4355: [FLINK-7206] [table] Implementation of DataView to suppor...

2017-08-24 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4355
  
The new created method `open(ctx)` and `close()` of `GeneratedAggregations` 
 are not called by `AggregateAggFunction` which is used in window aggregate.  I 
suggest to call the `open(ctx)` method but pass a `RuntimeContext` which throw 
exceptions in every method to tell users `User Defined AggregateFunction is not 
supported to call open() and close() in window`. But this can be addressed in 
another issue. 

BTW, I think the `AggregateCodeGenerator#generateAggregations` is too long 
with 500+ LOC. I would like to refactor it if you have no objection @kaibozhou 
@fhueske .  I have created 
[FLINK-7509](https://issues.apache.org/jira/browse/FLINK-7509) .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7509) Refactorings to AggregateCodeGenerator

2017-08-24 Thread Jark Wu (JIRA)
Jark Wu created FLINK-7509:
--

 Summary: Refactorings to AggregateCodeGenerator
 Key: FLINK-7509
 URL: https://issues.apache.org/jira/browse/FLINK-7509
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Jark Wu


I think the `AggregateCodeGenerator#generateAggregations` is too long with 500+ 
LOC currently and hard to extend. I would like to refactor it if you have no 
objection.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141123#comment-16141123
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135179322
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.reflect.Field
+
+import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, StateDescriptor}
+import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo}
+
+/**
+  * Data view specification.
+  *
+  * @tparam ACC type extends [[DataView]]
+  */
+trait DataViewSpec[ACC <: DataView] {
+  def id: String
+  def field: Field
+  def toStateDescriptor: StateDescriptor[_, _]
--- End diff --

Maybe this method also needed. State descriptors cannot code generated as 
the TypeInformation was passed by users.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135179322
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.reflect.Field
+
+import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, StateDescriptor}
+import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo}
+
+/**
+  * Data view specification.
+  *
+  * @tparam ACC type extends [[DataView]]
+  */
+trait DataViewSpec[ACC <: DataView] {
+  def id: String
+  def field: Field
+  def toStateDescriptor: StateDescriptor[_, _]
--- End diff --

Maybe this method also needed. State descriptors cannot code generated as 
the TypeInformation was passed by users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141121#comment-16141121
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135179101
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -178,13 +294,15 @@ class AggregationCodeGenerator(
|  ${aggMapping(i)},
|  (${accTypes(i)}) accs.getField($i));""".stripMargin
   } else {
+val setDataView = genDataViewFieldSetter(s"acc$i", i)
 j"""
|org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
|  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
-   |
+   |${accTypes(i)} acc$i = (${accTypes(i)}) 
accs.getField($i);
+   |$setDataView
--- End diff --

yes


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141120#comment-16141120
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135179087
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
--- End diff --

state descriptors need TypeInformation which passed by user,  so we can not 
code gen state descriptor. 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135179087
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
--- End diff --

state descriptors need TypeInformation which passed by user,  so we can not 
code gen state descriptor. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135179101
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -178,13 +294,15 @@ class AggregationCodeGenerator(
|  ${aggMapping(i)},
|  (${accTypes(i)}) accs.getField($i));""".stripMargin
   } else {
+val setDataView = genDataViewFieldSetter(s"acc$i", i)
 j"""
|org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
|  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
-   |
+   |${accTypes(i)} acc$i = (${accTypes(i)}) 
accs.getField($i);
+   |$setDataView
--- End diff --

yes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141085#comment-16141085
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135173258
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
+val byteData = Base64.decodeBase64(data)
+InstantiationUtil.deserializeObject[StateDescriptor[_, _]](
+  byteData,
+  Thread.currentThread.getContextClassLoader)
+  }
+
+  def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+s"acc${aggIndex}_${fieldName}_dataview"
+  }
 }
+
+case class DataViewConfig(accSpecs: Array[Seq[DataViewSpec[_]]], 
isStateBackedDataViews: Boolean)
--- End diff --

yes, this class not need now.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135173258
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
+val byteData = Base64.decodeBase64(data)
+InstantiationUtil.deserializeObject[StateDescriptor[_, _]](
+  byteData,
+  Thread.currentThread.getContextClassLoader)
+  }
+
+  def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+s"acc${aggIndex}_${fieldName}_dataview"
+  }
 }
+
+case class DataViewConfig(accSpecs: Array[Seq[DataViewSpec[_]]], 
isStateBackedDataViews: Boolean)
--- End diff --

yes, this class not need now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135172148
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -759,10 +786,12 @@ object AggregateUtil {
 : GroupCombineFunction[Row, Row] = {
 
 val needRetract = false
-val (aggFieldIndexes, aggregates, accTypes) = 
transformToAggregateFunctions(
+val isStateBackedDataViews = false
+val (aggFieldIndexes, aggregates, accTypes, accSpecs) = 
transformToAggregateFunctions(
--- End diff --

it make sense


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141078#comment-16141078
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135172148
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -759,10 +786,12 @@ object AggregateUtil {
 : GroupCombineFunction[Row, Row] = {
 
 val needRetract = false
-val (aggFieldIndexes, aggregates, accTypes) = 
transformToAggregateFunctions(
+val isStateBackedDataViews = false
+val (aggFieldIndexes, aggregates, accTypes, accSpecs) = 
transformToAggregateFunctions(
--- End diff --

it make sense


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135171980
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -356,6 +486,9 @@ class AggregationCodeGenerator(
""".stripMargin
 
   if (needMerge) {
+if (accConfig != null && accConfig.isStateBackedDataViews) {
+  throw new CodeGenException("DataView doesn't support merge when 
the backend uses state.")
--- End diff --

I found the exception will be throw in everywhere call 
generateAggregations(..).
I think we can print funcName in exception info.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141075#comment-16141075
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135171980
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -356,6 +486,9 @@ class AggregationCodeGenerator(
""".stripMargin
 
   if (needMerge) {
+if (accConfig != null && accConfig.isStateBackedDataViews) {
+  throw new CodeGenException("DataView doesn't support merge when 
the backend uses state.")
--- End diff --

I found the exception will be throw in everywhere call 
generateAggregations(..).
I think we can print funcName in exception info.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions

2017-08-24 Thread Eron Wright (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141068#comment-16141068
 ] 

Eron Wright  commented on FLINK-5479:
-

Thinking out loud here, consider making use of Kafka's 
`max.message.time.difference.ms` as the basis for idle timeout.

> Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
> --
>
> Key: FLINK-5479
> URL: https://issues.apache.org/jira/browse/FLINK-5479
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.4.0
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html
> Similar to what's happening to idle sources blocking watermark progression in 
> downstream operators (see FLINK-5017), the per-partition watermark mechanism 
> in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks 
> when a partition is idle. The watermark of idle partitions is always 
> {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions 
> of a consumer subtask will never proceed.
> It's normally not a common case to have Kafka partitions not producing any 
> data, but it'll probably be good to handle this as well. I think we should 
> have a localized solution similar to FLINK-5017 for the per-partition 
> watermarks in {{AbstractFetcher}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7402) Ineffective null check in NettyMessage#write()

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141044#comment-16141044
 ] 

ASF GitHub Bot commented on FLINK-7402:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4562
  
@NicoK Great, tt's too useful for me, thank you very much!


> Ineffective null check in NettyMessage#write()
> --
>
> Key: FLINK-7402
> URL: https://issues.apache.org/jira/browse/FLINK-7402
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the null check in finally block:
> {code}
>   finally {
> if (buffer != null) {
>   buffer.recycle();
> }
> {code}
> But buffer has been dereferenced in the try block without guard.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4562: [FLINK-7402] Fix ineffective null check in NettyMessage#w...

2017-08-24 Thread zjureel
Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4562
  
@NicoK Great, tt's too useful for me, thank you very much!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140995#comment-16140995
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135163096
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
+
+   @Override
+   public CountDistinctAccum createAccumulator() {
+   CountDistinctAccum accum = new CountDistinctAccum();
+   accum.map = new MapView<>(Types.STRING, Types.INT);
+   accum.count = 0L;
+   return accum;
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, String 
id) {
+   try {
+   if (!accumulator.map.contains(id)) {
+   accumulator.map.put(id, 1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, long id) 
{
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   accumulator.map.put(String.valueOf(id), 
1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   @Override
+   public Long getValue(CountDistinctAccum accumulator) {
+   return accumulator.count;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge.
+*/
+   public static class CountDistinctWithMerge extends CountDistinct {
+
+   //Overloaded merge method
+   public void merge(CountDistinctAccum acc, 
Iterable it) {
+   Iterator iter = it.iterator();
+   while (iter.hasNext()) {
+   CountDistinctAccum mergeAcc = iter.next();
+   acc.count += mergeAcc.count;
+
+   try {
+   Iterator mapItr = 
mergeAcc.map.keys().iterator();
+   while (mapItr.hasNext()) {
+   String key = mapItr.next();
+   if (!acc.map.contains(key)) {
+   acc.map.put(key, 1);
+   }
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge and reset.
+*/
+   public static class CountDistinctWithMergeAndReset extends 
CountDistinctWithMerge {
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {
+   acc.map.clear();
+   acc.count = 0;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with retract.
+*/
+   public static class CountDistinctWithRetractAndReset extends 
CountDistinct {
+
+   //Overloaded retract method
+   public void retract(CountDistinctAccum accumulator, long id) {
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   
accumulator.map.remove(String.valueOf(id));
+   accumulator.count -= 1;
+   }
+   } 

[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135163096
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
+
+   @Override
+   public CountDistinctAccum createAccumulator() {
+   CountDistinctAccum accum = new CountDistinctAccum();
+   accum.map = new MapView<>(Types.STRING, Types.INT);
+   accum.count = 0L;
+   return accum;
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, String 
id) {
+   try {
+   if (!accumulator.map.contains(id)) {
+   accumulator.map.put(id, 1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, long id) 
{
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   accumulator.map.put(String.valueOf(id), 
1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   @Override
+   public Long getValue(CountDistinctAccum accumulator) {
+   return accumulator.count;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge.
+*/
+   public static class CountDistinctWithMerge extends CountDistinct {
+
+   //Overloaded merge method
+   public void merge(CountDistinctAccum acc, 
Iterable it) {
+   Iterator iter = it.iterator();
+   while (iter.hasNext()) {
+   CountDistinctAccum mergeAcc = iter.next();
+   acc.count += mergeAcc.count;
+
+   try {
+   Iterator mapItr = 
mergeAcc.map.keys().iterator();
+   while (mapItr.hasNext()) {
+   String key = mapItr.next();
+   if (!acc.map.contains(key)) {
+   acc.map.put(key, 1);
+   }
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge and reset.
+*/
+   public static class CountDistinctWithMergeAndReset extends 
CountDistinctWithMerge {
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {
+   acc.map.clear();
+   acc.count = 0;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with retract.
+*/
+   public static class CountDistinctWithRetractAndReset extends 
CountDistinct {
+
+   //Overloaded retract method
+   public void retract(CountDistinctAccum accumulator, long id) {
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   
accumulator.map.remove(String.valueOf(id));
+   accumulator.count -= 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {

[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140991#comment-16140991
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135162070
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
+val byteData = Base64.decodeBase64(data)
+InstantiationUtil.deserializeObject[StateDescriptor[_, _]](
+  byteData,
+  Thread.currentThread.getContextClassLoader)
+  }
+
+  def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
--- End diff --

yes, just the omission of code refactoring


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135162070
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
+val byteData = Base64.decodeBase64(data)
+InstantiationUtil.deserializeObject[StateDescriptor[_, _]](
+  byteData,
+  Thread.currentThread.getContextClassLoader)
+  }
+
+  def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
--- End diff --

yes, just the omission of code refactoring


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-7490) UDF Agg throws Exception when flink-table is loaded with AppClassLoader

2017-08-24 Thread Miguel Rui Pereira Marques (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Miguel Rui Pereira Marques updated FLINK-7490:
--
Affects Version/s: 1.4.0

> UDF Agg throws Exception when flink-table is loaded with AppClassLoader
> ---
>
> Key: FLINK-7490
> URL: https://issues.apache.org/jira/browse/FLINK-7490
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1, 1.4.0
>Reporter: Miguel Rui Pereira Marques
>
> When a UDF aggregation for the Batch Table API is defined in the 
> FlinkUserCodeClassLoader and the Table API itself is loaded in the 
> AppClassLoader (the jar is included in the lib directory) this exception is 
> triggered:
> {panel:title=Exception}
> java.lang.Exception: The user defined 'open()' method caused an exception: 
> Table program cannot be compiled. This is a bug. Please file an issue.
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
>   at 
> org.apache.flink.table.runtime.aggregate.DataSetAggFunction.compile(DataSetAggFunction.scala:35)
>   at 
> org.apache.flink.table.runtime.aggregate.DataSetAggFunction.open(DataSetAggFunction.scala:49)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>   ... 3 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 5, Column 13: 
> Cannot determine simple type name "org"
>   at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
>   at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416)
>   at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177)
> ...
> {panel}
> Upon inspecting the code I think this may be due to the usage of 
> 'getClass.getClassLoader' instead of 
> 'getRuntimeContext.getUserCodeClassLoader' as an argument 'compile' in the 
> method 'open' of class 
> org.apache.flink.table.runtime.aggregate.DataSetAggFunction.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode

2017-08-24 Thread Bowen Li (JIRA)
Bowen Li created FLINK-7508:
---

 Summary: switch FlinkKinesisProducer to use KPL's ThreadingMode to 
ThreadedPool mode rather than Per_Request mode
 Key: FLINK-7508
 URL: https://issues.apache.org/jira/browse/FLINK-7508
 Project: Flink
  Issue Type: Improvement
  Components: Kinesis Connector
Affects Versions: 1.3.0
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.4.0, 1.3.3


KinesisProducerLibrary (KPL) 0.10.x had been using a One-New-Thread-Per-Request 
model for all requests sent to AWS Kinesis, which is very expensive.

0.12.4 introduced a new [ThreadingMode - 
Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225],
 which will use a thread pool. This hugely improves KPL's performance and 
reduces consumed resources. By default, KPL still uses per-request mode. We 
should explicitly switch FlinkKinesisProducer's KPL threading mode to 'Pooled'.

This work depends on FLINK-7366 which upgrades KPL from 0.10 to 0.12.5.





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140796#comment-16140796
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135126762
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -100,6 +108,17 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 */
   def resetAccumulator(accumulators: Row)
+
+  /**
+* Cleanup for the accumulators.
+*/
+  def cleanup()
+
+  /**
+* Tear-down method for 
[[org.apache.flink.table.functions.AggregateFunction]].
+* It can be used for clean up work. By default, this method does 
nothing.
+*/
+  def close()
--- End diff --

I think `close()` is never called. So we can remove it, right?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112454
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.common.typeutils.base.{MapSerializer, 
MapSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * A serializer for [[MapView]]. The serializer relies on a key 
serializer and a value
+  * serializer for the serialization of the map's key-value pairs.
+  *
+  * The serialization format for the map is as follows: four bytes for 
the length of the map,
+  * followed by the serialized representation of each key-value pair. To 
allow null values,
+  * each value is prefixed by a null marker.
+  *
+  * @param mapSerializer Map serializer.
+  * @tparam K The type of the keys in the map.
+  * @tparam V The type of the values in the map.
+  */
+@Internal
+class MapViewSerializer[K, V](val mapSerializer: MapSerializer[K, V])
+  extends TypeSerializer[MapView[K, V]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[MapView[K, V]] =
+new MapViewSerializer[K, V](
+  mapSerializer.duplicate().asInstanceOf[MapSerializer[K, V]])
+
+  override def createInstance(): MapView[K, V] = {
+val mapview = new MapView[K, V]
+mapview.putAll(mapSerializer.createInstance())
--- End diff --

No need to add anything to the new instance. The map should be empty.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135128323
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
+
+   @Override
+   public CountDistinctAccum createAccumulator() {
+   CountDistinctAccum accum = new CountDistinctAccum();
+   accum.map = new MapView<>(Types.STRING, Types.INT);
+   accum.count = 0L;
+   return accum;
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, String 
id) {
+   try {
+   if (!accumulator.map.contains(id)) {
+   accumulator.map.put(id, 1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, long id) 
{
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   accumulator.map.put(String.valueOf(id), 
1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   @Override
+   public Long getValue(CountDistinctAccum accumulator) {
+   return accumulator.count;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge.
+*/
+   public static class CountDistinctWithMerge extends CountDistinct {
+
+   //Overloaded merge method
+   public void merge(CountDistinctAccum acc, 
Iterable it) {
+   Iterator iter = it.iterator();
+   while (iter.hasNext()) {
+   CountDistinctAccum mergeAcc = iter.next();
+   acc.count += mergeAcc.count;
+
+   try {
+   Iterator mapItr = 
mergeAcc.map.keys().iterator();
+   while (mapItr.hasNext()) {
+   String key = mapItr.next();
+   if (!acc.map.contains(key)) {
+   acc.map.put(key, 1);
+   }
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge and reset.
+*/
+   public static class CountDistinctWithMergeAndReset extends 
CountDistinctWithMerge {
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {
+   acc.map.clear();
+   acc.count = 0;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with retract.
+*/
+   public static class CountDistinctWithRetractAndReset extends 
CountDistinct {
+
+   //Overloaded retract method
+   public void retract(CountDistinctAccum accumulator, long id) {
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   
accumulator.map.remove(String.valueOf(id));
--- End diff --

shouldn't a count distinct with retraction increment the counter value in 
the MapView in `accumulate` and decrement the counter in `retract`? Only when 
the counter reaches 0, the map entry should be removed and `accumulator.count` 
be decremented.


---
If your project is set up for it, you can reply to this email and have your

[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140788#comment-16140788
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135121779
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
+val byteData = Base64.decodeBase64(data)
+InstantiationUtil.deserializeObject[StateDescriptor[_, _]](
+  byteData,
+  Thread.currentThread.getContextClassLoader)
+  }
+
+  def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+s"acc${aggIndex}_${fieldName}_dataview"
+  }
 }
+
+case class DataViewConfig(accSpecs: Array[Seq[DataViewSpec[_]]], 
isStateBackedDataViews: Boolean)
--- End diff --

We only need a `DataViewConfig` if the views are backed by state right? So 
we can remove the `boolean` `isStateBackedDataViews` field and make it an 
optional parameter of the code generator.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140784#comment-16140784
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135107609
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
+@transient private[flink] val keyTypeInfo: TypeInformation[K],
+@transient private[flink] val valueTypeInfo: TypeInformation[V])
+  extends DataView {
+
+  def this() = this(null, null)
+
+  private[flink] var map = new util.HashMap[K, V]()
+
+  /**
+* Returns the value to which the specified key is mapped, or { @code 
null } if this map
+* contains no mapping for the key.
+*
+* @param key The key of the mapping.
+* @return The value of the mapping with the given key.
+* @throws Exception Thrown if the system cannot get data.
+*/
+  @throws[Exception]
+  def get(key: K): V = map.get(key)
+
+  /**
+* Put a value with the given key into the map.
+*
+* @param key   The key of the mapping.
+* @param value The new value of the mapping.
+* @throws Exception Thrown if the system cannot put data.
+*/
+  @throws[Exception]
+  def put(key: K, value: V): Unit = map.put(key, value)
+
+  /**
+* Copies all of the mappings from the specified map to this map view.
+*
+* @param map The mappings to be stored in this map.
+* @throws Exception Thrown if the system cannot access the map.
+*/
+  @throws[Exception]
+  def putAll(map: util.Map[K, V]): Unit = this.map.putAll(map)
+
+  /**
+* Deletes the mapping of the given key.
+*
+* @param key The key of the mapping.
  

[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135140565
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.reflect.Field
+
+import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, StateDescriptor}
+import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo}
+
+/**
+  * Data view specification.
+  *
+  * @tparam ACC type extends [[DataView]]
+  */
+trait DataViewSpec[ACC <: DataView] {
+  def id: String
+  def field: Field
+  def toStateDescriptor: StateDescriptor[_, _]
--- End diff --

Do we need this method? State descriptors can be initatiated by generated 
code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135126762
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -100,6 +108,17 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 */
   def resetAccumulator(accumulators: Row)
+
+  /**
+* Cleanup for the accumulators.
+*/
+  def cleanup()
+
+  /**
+* Tear-down method for 
[[org.apache.flink.table.functions.AggregateFunction]].
+* It can be used for clean up work. By default, this method does 
nothing.
+*/
+  def close()
--- End diff --

I think `close()` is never called. So we can remove it, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140799#comment-16140799
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135128744
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
+
+   @Override
+   public CountDistinctAccum createAccumulator() {
+   CountDistinctAccum accum = new CountDistinctAccum();
+   accum.map = new MapView<>(Types.STRING, Types.INT);
+   accum.count = 0L;
+   return accum;
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, String 
id) {
+   try {
+   if (!accumulator.map.contains(id)) {
+   accumulator.map.put(id, 1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, long id) 
{
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   accumulator.map.put(String.valueOf(id), 
1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   @Override
+   public Long getValue(CountDistinctAccum accumulator) {
+   return accumulator.count;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge.
+*/
+   public static class CountDistinctWithMerge extends CountDistinct {
+
+   //Overloaded merge method
+   public void merge(CountDistinctAccum acc, 
Iterable it) {
+   Iterator iter = it.iterator();
+   while (iter.hasNext()) {
+   CountDistinctAccum mergeAcc = iter.next();
+   acc.count += mergeAcc.count;
+
+   try {
+   Iterator mapItr = 
mergeAcc.map.keys().iterator();
+   while (mapItr.hasNext()) {
+   String key = mapItr.next();
+   if (!acc.map.contains(key)) {
+   acc.map.put(key, 1);
+   }
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge and reset.
+*/
+   public static class CountDistinctWithMergeAndReset extends 
CountDistinctWithMerge {
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {
+   acc.map.clear();
+   acc.count = 0;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with retract.
+*/
+   public static class CountDistinctWithRetractAndReset extends 
CountDistinct {
+
+   //Overloaded retract method
+   public void retract(CountDistinctAccum accumulator, long id) {
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   
accumulator.map.remove(String.valueOf(id));
+   accumulator.count -= 1;
+   }
+   } 

[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135109036
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
--- End diff --

lost -> list, remove the `` tag


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135121779
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
+val byteData = Base64.decodeBase64(data)
+InstantiationUtil.deserializeObject[StateDescriptor[_, _]](
+  byteData,
+  Thread.currentThread.getContextClassLoader)
+  }
+
+  def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+s"acc${aggIndex}_${fieldName}_dataview"
+  }
 }
+
+case class DataViewConfig(accSpecs: Array[Seq[DataViewSpec[_]]], 
isStateBackedDataViews: Boolean)
--- End diff --

We only need a `DataViewConfig` if the views are backed by state right? So 
we can remove the `boolean` `isStateBackedDataViews` field and make it an 
optional parameter of the code generator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112753
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.MapSerializer
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * [[MapView]] type information.
+  *
+  * @param keyType key type information
+  * @param valueType value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@PublicEvolving
--- End diff --

Please remove the annotation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140780#comment-16140780
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135107137
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
--- End diff --

`when use state backend` -> `if it is backed by a state backend.`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140797#comment-16140797
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135131075
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -79,13 +95,15 @@ class AggregationCodeGenerator(
 outputArity: Int,
 needRetract: Boolean,
 needMerge: Boolean,
-needReset: Boolean)
+needReset: Boolean,
+accConfig: DataViewConfig)
--- End diff --

make this an `Option[Array[Seq[DataViewSpec[_` with `None` default 
value?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140803#comment-16140803
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135135051
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
+val dataViewFieldTerm = AggregateUtil.createDataViewTerm(i, 
dataViewField.getName)
+val field =
+  s"""
+ |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+
+// create DataViews
+val descFieldTerm = s"${dataViewFieldTerm}_desc"
+val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+val descDeserialize =
+  s"""
+ |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+ |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+ |  .deserialize("$serializedData");
+ """.stripMargin
+val createDataView = if (dataViewField.getType == 
classOf[MapView[_, _]]) {
+  s"""
+ |$descDeserialize
+ |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateMapView(
+ |  $contextTerm.getMapState((
+ |
org.apache.flink.api.common.state.MapStateDescriptor)$descFieldTerm));
+   """.stripMargin
+} else if (dataViewField.getType == classOf[ListView[_]]) {
+  s"""
+ |$descDeserialize
+ |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateListView(
+ |  $contextTerm.getListState((
+ |
org.apache.flink.api.common.state.ListStateDescriptor)$descFieldTerm));
+   """.stripMargin
--- End diff --

indent


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140808#comment-16140808
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135139164
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -356,6 +486,9 @@ class AggregationCodeGenerator(
""".stripMargin
 
   if (needMerge) {
+if (accConfig != null && accConfig.isStateBackedDataViews) {
+  throw new CodeGenException("DataView doesn't support merge when 
the backend uses state.")
--- End diff --

Can we throw this exception earlier (e.g., in `AggregateUtil`) and give 
more details about the aggregation function?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135125473
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
+val byteData = Base64.decodeBase64(data)
+InstantiationUtil.deserializeObject[StateDescriptor[_, _]](
+  byteData,
+  Thread.currentThread.getContextClassLoader)
+  }
+
+  def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
--- End diff --

Move this method to `AggregationCodeGenerator`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135131075
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -79,13 +95,15 @@ class AggregationCodeGenerator(
 outputArity: Int,
 needRetract: Boolean,
 needMerge: Boolean,
-needReset: Boolean)
+needReset: Boolean,
+accConfig: DataViewConfig)
--- End diff --

make this an `Option[Array[Seq[DataViewSpec[_` with `None` default 
value?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140812#comment-16140812
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135125391
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
--- End diff --

do we need this method? Instantiation of state descriptors can be code 
generated.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140793#comment-16140793
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112227
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
--- End diff --

Please remove the annotation.
we don't use the `@Internal`, `@Public` and `@PublicEvolving` annotations 
in the `flink-table` module.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140805#comment-16140805
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112266
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfo.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.ListSerializer
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * [[ListView]] type information.
+  *
+  * @param elementType element type information
+  * @tparam T element type
+  */
+@PublicEvolving
--- End diff --

Please remove the annotation.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140809#comment-16140809
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112662
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.common.typeutils.base.{MapSerializer, 
MapSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * A serializer for [[MapView]]. The serializer relies on a key 
serializer and a value
+  * serializer for the serialization of the map's key-value pairs.
+  *
+  * The serialization format for the map is as follows: four bytes for 
the length of the map,
+  * followed by the serialized representation of each key-value pair. To 
allow null values,
+  * each value is prefixed by a null marker.
+  *
+  * @param mapSerializer Map serializer.
+  * @tparam K The type of the keys in the map.
+  * @tparam V The type of the values in the map.
+  */
+@Internal
+class MapViewSerializer[K, V](val mapSerializer: MapSerializer[K, V])
+  extends TypeSerializer[MapView[K, V]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[MapView[K, V]] =
+new MapViewSerializer[K, V](
+  mapSerializer.duplicate().asInstanceOf[MapSerializer[K, V]])
+
+  override def createInstance(): MapView[K, V] = {
+val mapview = new MapView[K, V]
+mapview.putAll(mapSerializer.createInstance())
+mapview
+  }
+
+  override def copy(from: MapView[K, V]): MapView[K, V] = {
+val mapview = new MapView[K, V]
+mapview.putAll(mapSerializer.copy(from.map))
--- End diff --

Same as for `ListStateSerializer`. Copying one list into another is more 
expensive than just replacing the list instance.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140804#comment-16140804
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135134994
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
+val dataViewFieldTerm = AggregateUtil.createDataViewTerm(i, 
dataViewField.getName)
+val field =
+  s"""
+ |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+
+// create DataViews
+val descFieldTerm = s"${dataViewFieldTerm}_desc"
+val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+val descDeserialize =
+  s"""
+ |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+ |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+ |  .deserialize("$serializedData");
+ """.stripMargin
--- End diff --

indent


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140790#comment-16140790
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112753
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.MapSerializer
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * [[MapView]] type information.
+  *
+  * @param keyType key type information
+  * @param valueType value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@PublicEvolving
--- End diff --

Please remove the annotation


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135133385
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
--- End diff --

The code is not `ListView` specific. Change to `DataView`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140792#comment-16140792
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135124134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -759,10 +786,12 @@ object AggregateUtil {
 : GroupCombineFunction[Row, Row] = {
 
 val needRetract = false
-val (aggFieldIndexes, aggregates, accTypes) = 
transformToAggregateFunctions(
+val isStateBackedDataViews = false
+val (aggFieldIndexes, aggregates, accTypes, accSpecs) = 
transformToAggregateFunctions(
--- End diff --

`transformToAggregateFunctions` has a default value for 
`isStateBackedDataViews`. So we don't need to pass a parameter. Moreover, we 
only need to pass data view information to the code generator if the data views 
are backed by state (if we make it an optional parameter). So most of the 
changes here can be reverted (and also for all other operators that do not 
support state backed views).


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140791#comment-16140791
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135110707
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](val listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.createInstance())
+listview
+  }
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.copy(from.list))
+listview
+  }
+
+  override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = 
copy(from)
+
+  override def getLength: Int = -1
+
+  override def serialize(record: ListView[T], target: DataOutputView): 
Unit = {
+listSerializer.serialize(record.list, target)
+  }
+
+  override def deserialize(source: DataInputView): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.deserialize(source))
--- End diff --

same as before. We could directly set the list.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140781#comment-16140781
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135108408
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1458,9 +1459,10 @@ abstract class CodeGenerator(
 * Adds a reusable [[UserDefinedFunction]] to the member area of the 
generated [[Function]].
 *
 * @param function [[UserDefinedFunction]] object to be instantiated 
during runtime
+* @param contextTerm [[RuntimeContext]] term
--- End diff --

`term to access the [[RuntimeContext]]`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135125391
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
--- End diff --

do we need this method? Instantiation of state descriptors can be code 
generated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135135021
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
+val dataViewFieldTerm = AggregateUtil.createDataViewTerm(i, 
dataViewField.getName)
+val field =
+  s"""
+ |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+
+// create DataViews
+val descFieldTerm = s"${dataViewFieldTerm}_desc"
+val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+val descDeserialize =
+  s"""
+ |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+ |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+ |  .deserialize("$serializedData");
+ """.stripMargin
+val createDataView = if (dataViewField.getType == 
classOf[MapView[_, _]]) {
+  s"""
+ |$descDeserialize
+ |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateMapView(
+ |  $contextTerm.getMapState((
+ |
org.apache.flink.api.common.state.MapStateDescriptor)$descFieldTerm));
+   """.stripMargin
--- End diff --

indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140779#comment-16140779
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135109902
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](val listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.createInstance())
--- End diff --

I think this call is not necessary. The new list instance is empty, so 
nothing is added.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140795#comment-16140795
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135128323
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
+
+   @Override
+   public CountDistinctAccum createAccumulator() {
+   CountDistinctAccum accum = new CountDistinctAccum();
+   accum.map = new MapView<>(Types.STRING, Types.INT);
+   accum.count = 0L;
+   return accum;
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, String 
id) {
+   try {
+   if (!accumulator.map.contains(id)) {
+   accumulator.map.put(id, 1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, long id) 
{
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   accumulator.map.put(String.valueOf(id), 
1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   @Override
+   public Long getValue(CountDistinctAccum accumulator) {
+   return accumulator.count;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge.
+*/
+   public static class CountDistinctWithMerge extends CountDistinct {
+
+   //Overloaded merge method
+   public void merge(CountDistinctAccum acc, 
Iterable it) {
+   Iterator iter = it.iterator();
+   while (iter.hasNext()) {
+   CountDistinctAccum mergeAcc = iter.next();
+   acc.count += mergeAcc.count;
+
+   try {
+   Iterator mapItr = 
mergeAcc.map.keys().iterator();
+   while (mapItr.hasNext()) {
+   String key = mapItr.next();
+   if (!acc.map.contains(key)) {
+   acc.map.put(key, 1);
+   }
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge and reset.
+*/
+   public static class CountDistinctWithMergeAndReset extends 
CountDistinctWithMerge {
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {
+   acc.map.clear();
+   acc.count = 0;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with retract.
+*/
+   public static class CountDistinctWithRetractAndReset extends 
CountDistinct {
+
+   //Overloaded retract method
+   public void retract(CountDistinctAccum accumulator, long id) {
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   
accumulator.map.remove(String.valueOf(id));
--- End diff --

shouldn't a count distinct with retraction increment the counter value in 
the MapView in 

[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140801#comment-16140801
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135125473
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
+val byteData = Base64.decodeBase64(data)
+InstantiationUtil.deserializeObject[StateDescriptor[_, _]](
+  byteData,
+  Thread.currentThread.getContextClassLoader)
+  }
+
+  def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
--- End diff --

Move this method to `AggregationCodeGenerator`?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140807#comment-16140807
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135125423
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
--- End diff --

do we need this method? Instantiation of state descriptors can be code 
generated.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140786#comment-16140786
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135106244
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * when use state backend..
--- End diff --

`when use state backend..` -> `if it is backed by a state backend.`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140783#comment-16140783
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112047
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.common.typeutils.base.{MapSerializer, 
MapSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * A serializer for [[MapView]]. The serializer relies on a key 
serializer and a value
+  * serializer for the serialization of the map's key-value pairs.
+  *
+  * The serialization format for the map is as follows: four bytes for 
the length of the map,
+  * followed by the serialized representation of each key-value pair. To 
allow null values,
+  * each value is prefixed by a null marker.
+  *
+  * @param mapSerializer Map serializer.
+  * @tparam K The type of the keys in the map.
+  * @tparam V The type of the values in the map.
+  */
+@Internal
--- End diff --

Please remove the annotation


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140782#comment-16140782
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135110511
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](val listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.createInstance())
+listview
+  }
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.copy(from.list))
--- End diff --

`addAll()` adds overhead because all elements are copied from on list to 
the other. Can't the serializer have direct access to the list field of the 
`ListView` and replace set the list copy as the `ListView`'s list?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140798#comment-16140798
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135135021
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
+val dataViewFieldTerm = AggregateUtil.createDataViewTerm(i, 
dataViewField.getName)
+val field =
+  s"""
+ |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+
+// create DataViews
+val descFieldTerm = s"${dataViewFieldTerm}_desc"
+val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+val descDeserialize =
+  s"""
+ |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+ |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+ |  .deserialize("$serializedData");
+ """.stripMargin
+val createDataView = if (dataViewField.getType == 
classOf[MapView[_, _]]) {
+  s"""
+ |$descDeserialize
+ |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateMapView(
+ |  $contextTerm.getMapState((
+ |
org.apache.flink.api.common.state.MapStateDescriptor)$descFieldTerm));
+   """.stripMargin
--- End diff --

indent


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140802#comment-16140802
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135124865
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1398,14 +1440,29 @@ object AggregateUtil {
   }
 }
 
+val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
 // create accumulator type information for every aggregate function
 aggregates.zipWithIndex.foreach { case (agg, index) =>
-  if (null == accTypes(index)) {
+  if (accTypes(index) != null) {
+val (accType, specs) = removeStateViewFieldsFromAccTypeInfo(index,
+  agg,
+  accTypes(index),
+  isStateBackedDataViews)
+if (specs.isDefined) {
+  accSpecs(index) = specs.get
+  accTypes(index) = accType
+} else {
+  accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg)
--- End diff --

use same order as above:
```
accSpecs(index) = ...
accTypes(index) = 
```


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140811#comment-16140811
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135133385
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
--- End diff --

The code is not `ListView` specific. Change to `DataView`?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140806#comment-16140806
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135140565
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.reflect.Field
+
+import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, StateDescriptor}
+import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo}
+
+/**
+  * Data view specification.
+  *
+  * @tparam ACC type extends [[DataView]]
+  */
+trait DataViewSpec[ACC <: DataView] {
+  def id: String
+  def field: Field
+  def toStateDescriptor: StateDescriptor[_, _]
--- End diff --

Do we need this method? State descriptors can be initatiated by generated 
code.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140810#comment-16140810
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135138596
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -178,13 +294,15 @@ class AggregationCodeGenerator(
|  ${aggMapping(i)},
|  (${accTypes(i)}) accs.getField($i));""".stripMargin
   } else {
+val setDataView = genDataViewFieldSetter(s"acc$i", i)
 j"""
|org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
|  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
-   |
+   |${accTypes(i)} acc$i = (${accTypes(i)}) 
accs.getField($i);
+   |$setDataView
--- End diff --

directly call `genDataViewFieldSetter(s"acc$i", i)` here? 
Would make it easier to follow the `acc$i` variables.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135134452
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
--- End diff --

Why do we need to serialize and deserialize state descriptors? Can't we 
just generate the code to instantiate them? IMO, that would be more 
straightforward and easier to debug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135110707
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](val listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.createInstance())
+listview
+  }
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.copy(from.list))
+listview
+  }
+
+  override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = 
copy(from)
+
+  override def getLength: Int = -1
+
+  override def serialize(record: ListView[T], target: DataOutputView): 
Unit = {
+listSerializer.serialize(record.list, target)
+  }
+
+  override def deserialize(source: DataInputView): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.deserialize(source))
--- End diff --

same as before. We could directly set the list.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140789#comment-16140789
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135109036
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
--- End diff --

lost -> list, remove the `` tag


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140787#comment-16140787
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112454
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.common.typeutils.base.{MapSerializer, 
MapSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * A serializer for [[MapView]]. The serializer relies on a key 
serializer and a value
+  * serializer for the serialization of the map's key-value pairs.
+  *
+  * The serialization format for the map is as follows: four bytes for 
the length of the map,
+  * followed by the serialized representation of each key-value pair. To 
allow null values,
+  * each value is prefixed by a null marker.
+  *
+  * @param mapSerializer Map serializer.
+  * @tparam K The type of the keys in the map.
+  * @tparam V The type of the values in the map.
+  */
+@Internal
+class MapViewSerializer[K, V](val mapSerializer: MapSerializer[K, V])
+  extends TypeSerializer[MapView[K, V]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[MapView[K, V]] =
+new MapViewSerializer[K, V](
+  mapSerializer.duplicate().asInstanceOf[MapSerializer[K, V]])
+
+  override def createInstance(): MapView[K, V] = {
+val mapview = new MapView[K, V]
+mapview.putAll(mapSerializer.createInstance())
--- End diff --

No need to add anything to the new instance. The map should be empty.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140785#comment-16140785
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135107708
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * when use state backend..
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](
+@transient private[flink] val elementTypeInfo: TypeInformation[T])
+  extends DataView {
+
+  def this() = this(null)
+
+  private[flink] val list = new util.ArrayList[T]()
+
+  /**
+* Returns an iterable of the list.
+*
+* @throws Exception Thrown if the system cannot get data.
+* @return The iterable of the list or { @code null} if the list is 
empty.
+*/
+  @throws[Exception]
+  def get: JIterable[T] = {
+if (!list.isEmpty) {
+  list
+} else {
+  null
+}
+  }
+
+  /**
+* Adding the given value to the list.
+*
+* @throws Exception Thrown if the system cannot add data.
+* @param value element to be appended to this list
+*/
+  @throws[Exception]
+  def add(value: T): Unit = list.add(value)
+
+  /**
+* Copies all of the elements from the specified list to this list view.
+*
+* @throws Exception Thrown if the system cannot add all data.
+* @param list The list to be stored in this list view.
+*/
+  @throws[Exception]
+  def addAll(list: util.List[T]): Unit = this.list.addAll(list)
+
+  /**
+* Removes all of the elements from this list.
+*
+* The list will be empty after this call returns.
+*/
+  override def clear(): Unit = list.clear()
+
+  override def equals(other: Any): Boolean = other match {
+case that: ListView[_] =>
--- End diff --

`case that: ListView[T] =>`?


> Implementation of DataView to support state access for UDAGG
> 

[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140800#comment-16140800
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135117112
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,139 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* Analyze the constructor to get the type information of the MapView 
or ListView type variables
+* inside the accumulate.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
+for (i <- 0 until fields.size()) {
+  val field = fields.get(i)
+  field.setAccessible(true)
+  if (classOf[DataView].isAssignableFrom(field.getType)) {
+if (field.getType == classOf[MapView[_, _]]) {
+  val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
+  if (mapView != null) {
+val keyTypeInfo = mapView.keyTypeInfo
+val valueTypeInfo = mapView.valueTypeInfo
+
+if (keyTypeInfo != null && valueTypeInfo != null) {
+  resultMap.put(field.getName, new 
MapViewTypeInfo(keyTypeInfo, valueTypeInfo))
+}
+  } else {
+resultMap.put(field.getName, null)
+  }
+} else if (field.getType == classOf[ListView[_]]) {
+  val listView = field.get(acc).asInstanceOf[ListView[_]]
+  val elementTypeInfo = listView.elementTypeInfo
+
+  if (elementTypeInfo != null) {
+resultMap.put(field.getName, new 
ListViewTypeInfo(elementTypeInfo))
+  }
+}
+  }
+}
+
+resultMap
+  }
+
+  /**
+* Remove StateView fields from accumulator type information.
+*
+* @param index index of aggregate function
+* @param aggFun aggregate function
+* @param accType accumulator type information, only support pojo type
+* @param isStateBackedDataViews is data views use state backend
+* @return mapping of accumulator type information and data view config 
which contains id,
+* field name and state descriptor
+*/
+  def removeStateViewFieldsFromAccTypeInfo(
+index: Int,
+aggFun: AggregateFunction[_, _],
+accType: TypeInformation[_],
+isStateBackedDataViews: Boolean)
+  : (TypeInformation[_], Option[Seq[DataViewSpec[_]]]) = {
+
+var hasDataView = false
+val acc = aggFun.createAccumulator()
+accType match {
+  case pojoType: PojoTypeInfo[_] if pojoType.getArity > 0 =>
+val arity = pojoType.getArity
+val newPojoFields = new util.ArrayList[PojoField]()
+val accumulatorSpecs = new mutable.ArrayBuffer[DataViewSpec[_]]
+for (i <- 0 until arity) {
+  val pojoField = pojoType.getPojoFieldAt(i)
+  val field = pojoField.getField
+  val fieldName = field.getName
+  field.setAccessible(true)
+
+  pojoField.getTypeInformation match {
+case map: MapViewTypeInfo[Any, Any] =>
+  val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
+  if (mapView != null) {
+val keyTypeInfo = mapView.keyTypeInfo
+val valueTypeInfo = mapView.valueTypeInfo
+val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo 
!= null) {
+  hasDataView = true
+  new MapViewTypeInfo(keyTypeInfo, valueTypeInfo)
+} else {
+  map
+}
+
+var spec = MapViewSpec(
+  "agg" + index + "$" + fieldName, // generate unique name 
to be used as state name
+  field,
+  newTypeInfo)
+
+accumulatorSpecs += spec
+if (!isStateBackedDataViews) { // add data view field 
which not use state backend
+  newPojoFields.add(new PojoField(field, newTypeInfo))
+}
+  }
+

[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140794#comment-16140794
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135134452
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
--- End diff --

Why do we need to serialize and deserialize state descriptors? Can't we 
just generate the code to instantiate them? IMO, that would be more 
straightforward and easier to debug.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135125423
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
--- End diff --

do we need this method? Instantiation of state descriptors can be code 
generated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135124865
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1398,14 +1440,29 @@ object AggregateUtil {
   }
 }
 
+val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
 // create accumulator type information for every aggregate function
 aggregates.zipWithIndex.foreach { case (agg, index) =>
-  if (null == accTypes(index)) {
+  if (accTypes(index) != null) {
+val (accType, specs) = removeStateViewFieldsFromAccTypeInfo(index,
+  agg,
+  accTypes(index),
+  isStateBackedDataViews)
+if (specs.isDefined) {
+  accSpecs(index) = specs.get
+  accTypes(index) = accType
+} else {
+  accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg)
--- End diff --

use same order as above:
```
accSpecs(index) = ...
accTypes(index) = 
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135134994
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
+val dataViewFieldTerm = AggregateUtil.createDataViewTerm(i, 
dataViewField.getName)
+val field =
+  s"""
+ |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+
+// create DataViews
+val descFieldTerm = s"${dataViewFieldTerm}_desc"
+val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+val descDeserialize =
+  s"""
+ |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+ |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+ |  .deserialize("$serializedData");
+ """.stripMargin
--- End diff --

indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112227
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
--- End diff --

Please remove the annotation.
we don't use the `@Internal`, `@Public` and `@PublicEvolving` annotations 
in the `flink-table` module.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135124134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -759,10 +786,12 @@ object AggregateUtil {
 : GroupCombineFunction[Row, Row] = {
 
 val needRetract = false
-val (aggFieldIndexes, aggregates, accTypes) = 
transformToAggregateFunctions(
+val isStateBackedDataViews = false
+val (aggFieldIndexes, aggregates, accTypes, accSpecs) = 
transformToAggregateFunctions(
--- End diff --

`transformToAggregateFunctions` has a default value for 
`isStateBackedDataViews`. So we don't need to pass a parameter. Moreover, we 
only need to pass data view information to the code generator if the data views 
are backed by state (if we make it an optional parameter). So most of the 
changes here can be reverted (and also for all other operators that do not 
support state backed views).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135107609
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
+@transient private[flink] val keyTypeInfo: TypeInformation[K],
+@transient private[flink] val valueTypeInfo: TypeInformation[V])
+  extends DataView {
+
+  def this() = this(null, null)
+
+  private[flink] var map = new util.HashMap[K, V]()
+
+  /**
+* Returns the value to which the specified key is mapped, or { @code 
null } if this map
+* contains no mapping for the key.
+*
+* @param key The key of the mapping.
+* @return The value of the mapping with the given key.
+* @throws Exception Thrown if the system cannot get data.
+*/
+  @throws[Exception]
+  def get(key: K): V = map.get(key)
+
+  /**
+* Put a value with the given key into the map.
+*
+* @param key   The key of the mapping.
+* @param value The new value of the mapping.
+* @throws Exception Thrown if the system cannot put data.
+*/
+  @throws[Exception]
+  def put(key: K, value: V): Unit = map.put(key, value)
+
+  /**
+* Copies all of the mappings from the specified map to this map view.
+*
+* @param map The mappings to be stored in this map.
+* @throws Exception Thrown if the system cannot access the map.
+*/
+  @throws[Exception]
+  def putAll(map: util.Map[K, V]): Unit = this.map.putAll(map)
+
+  /**
+* Deletes the mapping of the given key.
+*
+* @param key The key of the mapping.
+* @throws Exception Thrown if the system cannot access the map.
+*/
+  @throws[Exception]
+  def remove(key: K): Unit = map.remove(key)
+
+  /**
+* Returns whether there exists the given mapping.

[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135110511
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](val listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.createInstance())
+listview
+  }
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.copy(from.list))
--- End diff --

`addAll()` adds overhead because all elements are copied from on list to 
the other. Can't the serializer have direct access to the list field of the 
`ListView` and replace set the list copy as the `ListView`'s list?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112047
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.common.typeutils.base.{MapSerializer, 
MapSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * A serializer for [[MapView]]. The serializer relies on a key 
serializer and a value
+  * serializer for the serialization of the map's key-value pairs.
+  *
+  * The serialization format for the map is as follows: four bytes for 
the length of the map,
+  * followed by the serialized representation of each key-value pair. To 
allow null values,
+  * each value is prefixed by a null marker.
+  *
+  * @param mapSerializer Map serializer.
+  * @tparam K The type of the keys in the map.
+  * @tparam V The type of the values in the map.
+  */
+@Internal
--- End diff --

Please remove the annotation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112662
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.common.typeutils.base.{MapSerializer, 
MapSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * A serializer for [[MapView]]. The serializer relies on a key 
serializer and a value
+  * serializer for the serialization of the map's key-value pairs.
+  *
+  * The serialization format for the map is as follows: four bytes for 
the length of the map,
+  * followed by the serialized representation of each key-value pair. To 
allow null values,
+  * each value is prefixed by a null marker.
+  *
+  * @param mapSerializer Map serializer.
+  * @tparam K The type of the keys in the map.
+  * @tparam V The type of the values in the map.
+  */
+@Internal
+class MapViewSerializer[K, V](val mapSerializer: MapSerializer[K, V])
+  extends TypeSerializer[MapView[K, V]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[MapView[K, V]] =
+new MapViewSerializer[K, V](
+  mapSerializer.duplicate().asInstanceOf[MapSerializer[K, V]])
+
+  override def createInstance(): MapView[K, V] = {
+val mapview = new MapView[K, V]
+mapview.putAll(mapSerializer.createInstance())
+mapview
+  }
+
+  override def copy(from: MapView[K, V]): MapView[K, V] = {
+val mapview = new MapView[K, V]
+mapview.putAll(mapSerializer.copy(from.map))
--- End diff --

Same as for `ListStateSerializer`. Copying one list into another is more 
expensive than just replacing the list instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135109902
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](val listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.createInstance())
--- End diff --

I think this call is not necessary. The new list instance is empty, so 
nothing is added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135117112
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,139 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* Analyze the constructor to get the type information of the MapView 
or ListView type variables
+* inside the accumulate.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
+for (i <- 0 until fields.size()) {
+  val field = fields.get(i)
+  field.setAccessible(true)
+  if (classOf[DataView].isAssignableFrom(field.getType)) {
+if (field.getType == classOf[MapView[_, _]]) {
+  val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
+  if (mapView != null) {
+val keyTypeInfo = mapView.keyTypeInfo
+val valueTypeInfo = mapView.valueTypeInfo
+
+if (keyTypeInfo != null && valueTypeInfo != null) {
+  resultMap.put(field.getName, new 
MapViewTypeInfo(keyTypeInfo, valueTypeInfo))
+}
+  } else {
+resultMap.put(field.getName, null)
+  }
+} else if (field.getType == classOf[ListView[_]]) {
+  val listView = field.get(acc).asInstanceOf[ListView[_]]
+  val elementTypeInfo = listView.elementTypeInfo
+
+  if (elementTypeInfo != null) {
+resultMap.put(field.getName, new 
ListViewTypeInfo(elementTypeInfo))
+  }
+}
+  }
+}
+
+resultMap
+  }
+
+  /**
+* Remove StateView fields from accumulator type information.
+*
+* @param index index of aggregate function
+* @param aggFun aggregate function
+* @param accType accumulator type information, only support pojo type
+* @param isStateBackedDataViews is data views use state backend
+* @return mapping of accumulator type information and data view config 
which contains id,
+* field name and state descriptor
+*/
+  def removeStateViewFieldsFromAccTypeInfo(
+index: Int,
+aggFun: AggregateFunction[_, _],
+accType: TypeInformation[_],
+isStateBackedDataViews: Boolean)
+  : (TypeInformation[_], Option[Seq[DataViewSpec[_]]]) = {
+
+var hasDataView = false
+val acc = aggFun.createAccumulator()
+accType match {
+  case pojoType: PojoTypeInfo[_] if pojoType.getArity > 0 =>
+val arity = pojoType.getArity
+val newPojoFields = new util.ArrayList[PojoField]()
+val accumulatorSpecs = new mutable.ArrayBuffer[DataViewSpec[_]]
+for (i <- 0 until arity) {
+  val pojoField = pojoType.getPojoFieldAt(i)
+  val field = pojoField.getField
+  val fieldName = field.getName
+  field.setAccessible(true)
+
+  pojoField.getTypeInformation match {
+case map: MapViewTypeInfo[Any, Any] =>
+  val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
+  if (mapView != null) {
+val keyTypeInfo = mapView.keyTypeInfo
+val valueTypeInfo = mapView.valueTypeInfo
+val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo 
!= null) {
+  hasDataView = true
+  new MapViewTypeInfo(keyTypeInfo, valueTypeInfo)
+} else {
+  map
+}
+
+var spec = MapViewSpec(
+  "agg" + index + "$" + fieldName, // generate unique name 
to be used as state name
+  field,
+  newTypeInfo)
+
+accumulatorSpecs += spec
+if (!isStateBackedDataViews) { // add data view field 
which not use state backend
+  newPojoFields.add(new PojoField(field, newTypeInfo))
+}
+  }
+
+case list: ListViewTypeInfo[Any] =>
+  val listView = field.get(acc).asInstanceOf[ListView[_]]
+  if (listView != null) {
+val elementTypeInfo = listView.elementTypeInfo

[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135135051
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
+val dataViewFieldTerm = AggregateUtil.createDataViewTerm(i, 
dataViewField.getName)
+val field =
+  s"""
+ |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+
+// create DataViews
+val descFieldTerm = s"${dataViewFieldTerm}_desc"
+val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+val descDeserialize =
+  s"""
+ |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+ |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+ |  .deserialize("$serializedData");
+ """.stripMargin
+val createDataView = if (dataViewField.getType == 
classOf[MapView[_, _]]) {
+  s"""
+ |$descDeserialize
+ |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateMapView(
+ |  $contextTerm.getMapState((
+ |
org.apache.flink.api.common.state.MapStateDescriptor)$descFieldTerm));
+   """.stripMargin
+} else if (dataViewField.getType == classOf[ListView[_]]) {
+  s"""
+ |$descDeserialize
+ |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateListView(
+ |  $contextTerm.getListState((
+ |
org.apache.flink.api.common.state.ListStateDescriptor)$descFieldTerm));
+   """.stripMargin
--- End diff --

indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135107708
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * when use state backend..
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](
+@transient private[flink] val elementTypeInfo: TypeInformation[T])
+  extends DataView {
+
+  def this() = this(null)
+
+  private[flink] val list = new util.ArrayList[T]()
+
+  /**
+* Returns an iterable of the list.
+*
+* @throws Exception Thrown if the system cannot get data.
+* @return The iterable of the list or { @code null} if the list is 
empty.
+*/
+  @throws[Exception]
+  def get: JIterable[T] = {
+if (!list.isEmpty) {
+  list
+} else {
+  null
+}
+  }
+
+  /**
+* Adding the given value to the list.
+*
+* @throws Exception Thrown if the system cannot add data.
+* @param value element to be appended to this list
+*/
+  @throws[Exception]
+  def add(value: T): Unit = list.add(value)
+
+  /**
+* Copies all of the elements from the specified list to this list view.
+*
+* @throws Exception Thrown if the system cannot add all data.
+* @param list The list to be stored in this list view.
+*/
+  @throws[Exception]
+  def addAll(list: util.List[T]): Unit = this.list.addAll(list)
+
+  /**
+* Removes all of the elements from this list.
+*
+* The list will be empty after this call returns.
+*/
+  override def clear(): Unit = list.clear()
+
+  override def equals(other: Any): Boolean = other match {
+case that: ListView[_] =>
--- End diff --

`case that: ListView[T] =>`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135138596
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -178,13 +294,15 @@ class AggregationCodeGenerator(
|  ${aggMapping(i)},
|  (${accTypes(i)}) accs.getField($i));""".stripMargin
   } else {
+val setDataView = genDataViewFieldSetter(s"acc$i", i)
 j"""
|org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
|  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
-   |
+   |${accTypes(i)} acc$i = (${accTypes(i)}) 
accs.getField($i);
+   |$setDataView
--- End diff --

directly call `genDataViewFieldSetter(s"acc$i", i)` here? 
Would make it easier to follow the `acc$i` variables.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135139164
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -356,6 +486,9 @@ class AggregationCodeGenerator(
""".stripMargin
 
   if (needMerge) {
+if (accConfig != null && accConfig.isStateBackedDataViews) {
+  throw new CodeGenException("DataView doesn't support merge when 
the backend uses state.")
--- End diff --

Can we throw this exception earlier (e.g., in `AggregateUtil`) and give 
more details about the aggregation function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135128744
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
+
+   @Override
+   public CountDistinctAccum createAccumulator() {
+   CountDistinctAccum accum = new CountDistinctAccum();
+   accum.map = new MapView<>(Types.STRING, Types.INT);
+   accum.count = 0L;
+   return accum;
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, String 
id) {
+   try {
+   if (!accumulator.map.contains(id)) {
+   accumulator.map.put(id, 1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, long id) 
{
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   accumulator.map.put(String.valueOf(id), 
1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   @Override
+   public Long getValue(CountDistinctAccum accumulator) {
+   return accumulator.count;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge.
+*/
+   public static class CountDistinctWithMerge extends CountDistinct {
+
+   //Overloaded merge method
+   public void merge(CountDistinctAccum acc, 
Iterable it) {
+   Iterator iter = it.iterator();
+   while (iter.hasNext()) {
+   CountDistinctAccum mergeAcc = iter.next();
+   acc.count += mergeAcc.count;
+
+   try {
+   Iterator mapItr = 
mergeAcc.map.keys().iterator();
+   while (mapItr.hasNext()) {
+   String key = mapItr.next();
+   if (!acc.map.contains(key)) {
+   acc.map.put(key, 1);
+   }
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge and reset.
+*/
+   public static class CountDistinctWithMergeAndReset extends 
CountDistinctWithMerge {
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {
+   acc.map.clear();
+   acc.count = 0;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with retract.
+*/
+   public static class CountDistinctWithRetractAndReset extends 
CountDistinct {
+
+   //Overloaded retract method
+   public void retract(CountDistinctAccum accumulator, long id) {
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   
accumulator.map.remove(String.valueOf(id));
+   accumulator.count -= 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {
  

[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112266
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfo.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.ListSerializer
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * [[ListView]] type information.
+  *
+  * @param elementType element type information
+  * @tparam T element type
+  */
+@PublicEvolving
--- End diff --

Please remove the annotation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135108408
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1458,9 +1459,10 @@ abstract class CodeGenerator(
 * Adds a reusable [[UserDefinedFunction]] to the member area of the 
generated [[Function]].
 *
 * @param function [[UserDefinedFunction]] object to be instantiated 
during runtime
+* @param contextTerm [[RuntimeContext]] term
--- End diff --

`term to access the [[RuntimeContext]]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135106244
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * when use state backend..
--- End diff --

`when use state backend..` -> `if it is backed by a state backend.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135107137
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
--- End diff --

`when use state backend` -> `if it is backed by a state backend.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7484) com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5

2017-08-24 Thread Shashank Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140606#comment-16140606
 ] 

Shashank Agarwal commented on FLINK-7484:
-

Got another crash. I have not used any INR class in code. May be it's taking 
automatically from some value.

{code)
com.esotericsoftware.kryo.KryoException: Unable to find class: INR
Serialization trace:
underlying (scala.collection.convert.Wrappers$SeqWrapper)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43)
at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:190)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at 
org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74)
at 
org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:34)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
at 
org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:77)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: INR
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
... 23 more
{code}

> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
> Index: 7, Size: 5
> ---
>
> Key: FLINK-7484
> URL: https://issues.apache.org/jira/browse/FLINK-7484
> Project: Flink
>  Issue Type: Bug
>  Components: CEP, DataStream API, Scala API
>Affects Versions: 1.3.2
> Environment: Flink 1.3.2 , Yarn Cluster, FsStateBackend
>Reporter: Shashank Agarwal
>
> I am using many CEP's and Global Window. I am getting following error 
> sometimes and application  crashes. I have checked logically there's no flow 
> in the program. Here ItemPojo is a Pojo class and we are using 
> java.utill.List[ItemPojo]. We are using Scala DataStream API please find 
> attached logs.
> {code}
> 2017-08-17 10:04:12,814 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - TriggerWindow(GlobalWindows(), 
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
>  co.thirdwatch.trigger.TransactionTrigger@5707c1cb, 
> WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: 
> Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1) switched 
> from RUNNING to FAILED.
> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
> Index: 7, Size: 5
> Serialization trace:
> category (co.thirdwatch.pojo.ItemPojo)
> underlying (scala.collection.convert.Wrappers$SeqWrapper)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at 

[jira] [Closed] (FLINK-7460) Close all ColumnFamilyHandles when restoring rescaled incremental checkpoints

2017-08-24 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-7460.
-
Resolution: Fixed

merged in ca87bec4f7.

> Close all ColumnFamilyHandles when restoring rescaled incremental checkpoints
> -
>
> Key: FLINK-7460
> URL: https://issues.apache.org/jira/browse/FLINK-7460
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Minor
>
> In the restore method of the `RocksDBKeyedStateBackend` exists a case for 
> rescaling incremental checkpoints. This code creates temporary RocksDB 
> instances, but does not close the ColumnFamilyHandles created for those 
> temporary instances. This has shown in some assertion errors from RocksDB in 
> certain tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7505) Use lambdas in suppressed exception idiom

2017-08-24 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-7505.
-
Resolution: Fixed

merged in 5456cf9f8f.

> Use lambdas in suppressed exception idiom
> -
>
> Key: FLINK-7505
> URL: https://issues.apache.org/jira/browse/FLINK-7505
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Trivial
>
> We can use Java 8 lamdas for the suppressed exception idiom on loops to unify 
> the code for all possible methods without parameters that throw exceptions, 
> such as {{close()}}, {{dispose()}}, etc.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7461) Remove Backwards compatibility for Flink 1.1 from Flink 1.4

2017-08-24 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-7461.
-
Resolution: Fixed

merged in 6642768ad8.

> Remove Backwards compatibility for Flink 1.1 from Flink 1.4
> ---
>
> Key: FLINK-7461
> URL: https://issues.apache.org/jira/browse/FLINK-7461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> This issue tracks the removal of Flink 1.1 backwards compatibility from Flink 
> 1.4. This step is helpful for further developments because it will remove 
> many old code paths and special cases. In particular, we can drop all 
> handling for non-partitionable state, i.e. state that was created with the 
> old {{Checkpointed}} interface.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >