[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4355 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135697894 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView provides List functionality for accumulators used by user-defined aggregate functions + * {{AggregateFunction}}. + * + * A ListView can be backed by a Java ArrayList or a state backend, depending on the context in + * which the function is used. + * + * At runtime `ListView` will be replaced by a [[org.apache.flink.table.dataview.StateListView]] + * if it is backed by a state backend. + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T]( +@transient private[flink] val elementTypeInfo: TypeInformation[T]) + extends DataView { + + def this() = this(null) + + private[flink] var list: util.List[T] = new util.ArrayList[T]() --- End diff -- Also the MapView should apply these modification. Very good suggesttion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135541895 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[ListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the list, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +class ListViewSerializer[T](val listSerializer: ListSerializer[T]) + extends TypeSerializer[ListView[T]] { + + override def isImmutableType: Boolean = false + + override def duplicate(): TypeSerializer[ListView[T]] = { +new ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]]) + } + + override def createInstance(): ListView[T] = { +new ListView[T] + } + + override def copy(from: ListView[T]): ListView[T] = { +val listview = new ListView[T] +listview.list = from.list --- End diff -- We should create a copy of `from.list` using the `ListSerializer`. Otherwise we share the instance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135534660 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -161,13 +182,119 @@ class AggregationCodeGenerator( } } +/** + * Create DataView Term, for example, acc1_map_dataview. + * + * @param aggIndex index of aggregate function + * @param fieldName field name of DataView + * @return term to access [[MapView]] or [[ListView]] + */ +def createDataViewTerm(aggIndex: Int, fieldName: String): String = { + s"acc${aggIndex}_${fieldName}_dataview" +} + +/** + * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, + * close and member area of the generated function. + * + */ +def addReusableDataViews: Unit = { --- End diff -- Add parentheses to method. Only methods without side-effects should have no parentheses. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135550350 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView provides List functionality for accumulators used by user-defined aggregate functions + * {{AggregateFunction}}. + * + * A ListView can be backed by a Java ArrayList or a state backend, depending on the context in + * which the function is used. + * + * At runtime `ListView` will be replaced by a [[org.apache.flink.table.dataview.StateListView]] + * if it is backed by a state backend. + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T]( +@transient private[flink] val elementTypeInfo: TypeInformation[T]) + extends DataView { + + def this() = this(null) + + private[flink] var list: util.List[T] = new util.ArrayList[T]() --- End diff -- right now an empty `ArrayList` is always created when a `ListView` is instantiated. This is unnecessary overhead when the `ListView` is copied or deserialized using `ListViewSerializer` because the empty instance is immediately replaced. We should add an option to create a `ListView` without an `ArrayList` instance. This means we have to move the creation of the `ArrayList` out of the primary constructor. The same applies to the `MapView`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135528903 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -161,13 +182,119 @@ class AggregationCodeGenerator( } } +/** + * Create DataView Term, for example, acc1_map_dataview. + * + * @param aggIndex index of aggregate function + * @param fieldName field name of DataView + * @return term to access [[MapView]] or [[ListView]] + */ +def createDataViewTerm(aggIndex: Int, fieldName: String): String = { + s"acc${aggIndex}_${fieldName}_dataview" +} + +/** + * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, + * close and member area of the generated function. + * + */ +def addReusableDataViews: Unit = { + if (accConfig.isDefined) { +val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get + .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) + .toMap[String, StateDescriptor[_, _]] + +for (i <- aggs.indices) yield { + for (spec <- accConfig.get(i)) yield { +val dataViewField = spec.field +val dataViewTypeTerm = dataViewField.getType.getCanonicalName +val desc = descMapping.getOrElse(spec.id, + throw new CodeGenException(s"Can not find DataView in accumulator by id: ${spec.id}")) + +// define the DataView variables +val serializedData = AggregateUtil.serialize(desc) --- End diff -- move `serialize` method to this class and rename to `serializeStateDescriptor` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r13411 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView provides List functionality for accumulators used by user-defined aggregate functions + * {{AggregateFunction}}. + * + * A ListView can be backed by a Java ArrayList or a state backend, depending on the context in + * which the function is used. + * + * At runtime `ListView` will be replaced by a [[org.apache.flink.table.dataview.StateListView]] + * if it is backed by a state backend. + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T]( +@transient private[flink] val elementTypeInfo: TypeInformation[T]) + extends DataView { + + def this() = this(null) + + private[flink] var list: util.List[T] = new util.ArrayList[T]() --- End diff -- We can refactor the `ListView` constructors as follows: ``` class ListView[T] private[flink]( @transient private[flink] val elementTypeInfo: TypeInformation[T], private[flink] val list: util.List[T]) extends DataView { def this(elementTypeInfo: TypeInformation[T]) { this(elementTypeInfo, new util.ArrayList[T]()) } def this() = { this(null, new util.ArrayList[T]()) } ... } ``` and call the primary constructor in the `ListSerializer` with `null` for the type information. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135539685 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -249,7 +258,8 @@ object AggregateUtil { outputArity, needRetract, needMerge = false, - needReset = true + needReset = true, --- End diff -- `needReset` can be `false`. `resetAccumulator()` is not called by the any of the window operators. Not sure why this was `true` before... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135528733 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -161,13 +182,119 @@ class AggregationCodeGenerator( } } +/** + * Create DataView Term, for example, acc1_map_dataview. + * + * @param aggIndex index of aggregate function + * @param fieldName field name of DataView + * @return term to access [[MapView]] or [[ListView]] + */ +def createDataViewTerm(aggIndex: Int, fieldName: String): String = { + s"acc${aggIndex}_${fieldName}_dataview" +} + +/** + * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, + * close and member area of the generated function. + * + */ +def addReusableDataViews: Unit = { + if (accConfig.isDefined) { +val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get + .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) + .toMap[String, StateDescriptor[_, _]] + +for (i <- aggs.indices) yield { + for (spec <- accConfig.get(i)) yield { +val dataViewField = spec.field +val dataViewTypeTerm = dataViewField.getType.getCanonicalName +val desc = descMapping.getOrElse(spec.id, + throw new CodeGenException(s"Can not find DataView in accumulator by id: ${spec.id}")) + +// define the DataView variables +val serializedData = AggregateUtil.serialize(desc) +val dataViewFieldTerm = createDataViewTerm(i, dataViewField.getName) +val field = + s""" + |transient $dataViewTypeTerm $dataViewFieldTerm = null; + """.stripMargin +reusableMemberStatements.add(field) + +// create DataViews +val descFieldTerm = s"${dataViewFieldTerm}_desc" +val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName +val descDeserializeCode = + s""" + |$descClassQualifier $descFieldTerm = ($descClassQualifier) + | ${AggregateUtil.getClass.getName.stripSuffix("$")} --- End diff -- implement deserialization directly in generated code. Moreover, we should use the user code classloader for the deserialization which is accessible via the `RuntimeContext`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135648347 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -307,6 +312,92 @@ object UserDefinedFunctionUtils { // -- /** +* Remove StateView fields from accumulator type information. +* +* @param index index of aggregate function +* @param aggFun aggregate function +* @param accType accumulator type information, only support pojo type +* @param isStateBackedDataViews is data views use state backend +* @return mapping of accumulator type information and data view config which contains id, +* field name and state descriptor +*/ + def removeStateViewFieldsFromAccTypeInfo( +index: Int, +aggFun: AggregateFunction[_, _], +accType: TypeInformation[_], +isStateBackedDataViews: Boolean) + : (TypeInformation[_], Option[Seq[DataViewSpec[_]]]) = { + +var hasDataView = false +val acc = aggFun.createAccumulator() +accType match { + case pojoType: PojoTypeInfo[_] if pojoType.getArity > 0 => +val arity = pojoType.getArity +val newPojoFields = new util.ArrayList[PojoField]() +val accumulatorSpecs = new mutable.ArrayBuffer[DataViewSpec[_]] +for (i <- 0 until arity) { + val pojoField = pojoType.getPojoFieldAt(i) + val field = pojoField.getField + val fieldName = field.getName + field.setAccessible(true) + + pojoField.getTypeInformation match { +case map: MapViewTypeInfo[Any, Any] => + val mapView = field.get(acc).asInstanceOf[MapView[_, _]] + if (mapView != null) { +val keyTypeInfo = mapView.keyTypeInfo +val valueTypeInfo = mapView.valueTypeInfo +val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo != null) { + hasDataView = true + new MapViewTypeInfo(keyTypeInfo, valueTypeInfo) +} else { + map +} + +var spec = MapViewSpec( + "agg" + index + "$" + fieldName, // generate unique name to be used as state name + field, + newTypeInfo) + +accumulatorSpecs += spec +if (!isStateBackedDataViews) { // add data view field which not use state backend + newPojoFields.add(new PojoField(field, newTypeInfo)) +} + } + +case list: ListViewTypeInfo[Any] => + val listView = field.get(acc).asInstanceOf[ListView[_]] + if (listView != null) { +val elementTypeInfo = listView.elementTypeInfo +val newTypeInfo = if (elementTypeInfo != null) { + hasDataView = true + new ListViewTypeInfo(elementTypeInfo) +} else { + list +} + +var spec = ListViewSpec( + "agg" + index + "$" + fieldName, // generate unique name to be used as state name + field, + newTypeInfo) + +accumulatorSpecs += spec +if (!isStateBackedDataViews) { // add data view field which not use state backend + newPojoFields.add(new PojoField(field, newTypeInfo)) +} + } + +case _ => newPojoFields.add(pojoField) + } +} +(new PojoTypeInfo(accType.getTypeClass, newPojoFields), Some(accumulatorSpecs)) + + case _ if !hasDataView => (accType, None) + case _ => throw new TableException("MapView and ListView only support in PoJo class") --- End diff -- This case will never be reached. `hasDataView` is only set to `true` in the `case pojoType: PojoTypeInfo[_]` case. Hence, it will always be false when we come to this point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135587110 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1398,14 +1412,29 @@ object AggregateUtil { } } +val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + // create accumulator type information for every aggregate function aggregates.zipWithIndex.foreach { case (agg, index) => - if (null == accTypes(index)) { + if (accTypes(index) != null) { +val (accType, specs) = removeStateViewFieldsFromAccTypeInfo(index, + agg, + accTypes(index), + isStateBackedDataViews) +if (specs.isDefined) { + accSpecs(index) = specs.get + accTypes(index) = accType +} else { + accSpecs(index) = Seq() + accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg) --- End diff -- No need to override `accTypes(index)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135561193 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java --- @@ -237,9 +248,13 @@ public void resetAccumulator(CountDistinctAccum acc) { //Overloaded retract method public void retract(CountDistinctAccum accumulator, long id) { try { - if (!accumulator.map.contains(String.valueOf(id))) { - accumulator.map.remove(String.valueOf(id)); - accumulator.count -= 1; + Integer cnt = accumulator.map.get(String.valueOf(id)); + if (cnt != null) { + cnt -= 1; + if (cnt <= 0) { + accumulator.map.remove(String.valueOf(id)); + accumulator.count -= 1; + } --- End diff -- We should update the count if it is > 0: ``` else { accumulator.map.put(String.valueOf(id), cnt) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135524948 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -42,6 +46,18 @@ class AggregationCodeGenerator( input: TypeInformation[_ <: Any]) extends CodeGenerator(config, nullableInput, input) { + // set of statements for cleanup dataview that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableCleanupStatements = mutable.LinkedHashSet[String]() + + /** +* @return code block of statements that need to be placed in the cleanup() method of --- End diff -- `RichFunction` does not have a `cleanup()` method. The `cleanup()` method is a method of `GeneratedAggregations`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135504282 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.reflect.Field + +import org.apache.flink.api.common.state.{ListStateDescriptor, MapStateDescriptor, StateDescriptor} +import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo} + +/** + * Data view specification. + * + * @tparam ACC type extends [[DataView]] + */ +trait DataViewSpec[ACC <: DataView] { + def id: String + def field: Field + def toStateDescriptor: StateDescriptor[_, _] --- End diff -- Very good point, you are right! We need to generate the state descriptors here, serialize them and ship them. Thanks for the clarification. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135503957 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala --- @@ -100,6 +108,17 @@ abstract class GeneratedAggregations extends Function { * aggregated results */ def resetAccumulator(accumulators: Row) + + /** +* Cleanup for the accumulators. +*/ + def cleanup() + + /** +* Tear-down method for [[org.apache.flink.table.functions.AggregateFunction]]. +* It can be used for clean up work. By default, this method does nothing. +*/ + def close() --- End diff -- Sorry, I overlooked the `close()` calls. If the method is used, we should keep it of course. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135184770 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java --- @@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int iValue, int iWeight) { accumulator.count -= iWeight; } } + + /** +* CountDistinct accumulator. +*/ + public static class CountDistinctAccum { + public MapView map; + public long count; + } + + /** +* CountDistinct aggregate. +*/ + public static class CountDistinct extends AggregateFunction { + + @Override + public CountDistinctAccum createAccumulator() { + CountDistinctAccum accum = new CountDistinctAccum(); + accum.map = new MapView<>(Types.STRING, Types.INT); + accum.count = 0L; + return accum; + } + + //Overloaded accumulate method + public void accumulate(CountDistinctAccum accumulator, String id) { + try { + if (!accumulator.map.contains(id)) { + accumulator.map.put(id, 1); + accumulator.count += 1; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + //Overloaded accumulate method + public void accumulate(CountDistinctAccum accumulator, long id) { + try { + if (!accumulator.map.contains(String.valueOf(id))) { + accumulator.map.put(String.valueOf(id), 1); + accumulator.count += 1; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public Long getValue(CountDistinctAccum accumulator) { + return accumulator.count; + } + } + + /** +* CountDistinct aggregate with merge. +*/ + public static class CountDistinctWithMerge extends CountDistinct { + + //Overloaded merge method + public void merge(CountDistinctAccum acc, Iterable it) { + Iterator iter = it.iterator(); + while (iter.hasNext()) { + CountDistinctAccum mergeAcc = iter.next(); + acc.count += mergeAcc.count; + + try { + Iterator mapItr = mergeAcc.map.keys().iterator(); + while (mapItr.hasNext()) { + String key = mapItr.next(); + if (!acc.map.contains(key)) { + acc.map.put(key, 1); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + /** +* CountDistinct aggregate with merge and reset. +*/ + public static class CountDistinctWithMergeAndReset extends CountDistinctWithMerge { + + //Overloaded retract method + public void resetAccumulator(CountDistinctAccum acc) { + acc.map.clear(); + acc.count = 0; + } + } + + /** +* CountDistinct aggregate with retract. +*/ + public static class CountDistinctWithRetractAndReset extends CountDistinct { + + //Overloaded retract method + public void retract(CountDistinctAccum accumulator, long id) { + try { + if (!accumulator.map.contains(String.valueOf(id))) { + accumulator.map.remove(String.valueOf(id)); --- End diff -- The code here is the opposite. It should be: ``` if (accumulator.map.contains(String.valueOf(id))) { accumulator.count -= 1; accumulator.map.remove(String.valueOf(id)); } ``` One record come, the map will put (record
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135181433 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala --- @@ -100,6 +108,17 @@ abstract class GeneratedAggregations extends Function { * aggregated results */ def resetAccumulator(accumulators: Row) + + /** +* Cleanup for the accumulators. +*/ + def cleanup() + + /** +* Tear-down method for [[org.apache.flink.table.functions.AggregateFunction]]. +* It can be used for clean up work. By default, this method does nothing. +*/ + def close() --- End diff -- The close() method is corresponding to open method. And it will be called when open was called before, e.g. ProcTimeUnboundedOver's close() method. Otherwise, the udagg's close will not be called. I modify the case testGroupAggregateWithStateBackend to test the close was called. what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135179322 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.reflect.Field + +import org.apache.flink.api.common.state.{ListStateDescriptor, MapStateDescriptor, StateDescriptor} +import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo} + +/** + * Data view specification. + * + * @tparam ACC type extends [[DataView]] + */ +trait DataViewSpec[ACC <: DataView] { + def id: String + def field: Field + def toStateDescriptor: StateDescriptor[_, _] --- End diff -- Maybe this method also needed. State descriptors cannot code generated as the TypeInformation was passed by users. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135179087 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -161,13 +182,108 @@ class AggregationCodeGenerator( } } +/** + * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, + * close and member area of the generated function. + * + */ +def addReusableDataViews: Unit = { + if (accConfig != null && accConfig.isStateBackedDataViews) { +val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.accSpecs + .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) + .toMap[String, StateDescriptor[_, _]] + +for (i <- aggs.indices) yield { + for (spec <- accConfig.accSpecs(i)) yield { +val dataViewField = spec.field +val dataViewTypeTerm = dataViewField.getType.getCanonicalName +val desc = descMapping.getOrElse(spec.id, + throw new CodeGenException(s"Can not find ListView in accumulator by id: ${spec.id}")) + +// define the DataView variables +val serializedData = AggregateUtil.serialize(desc) --- End diff -- state descriptors need TypeInformation which passed by user, so we can not code gen state descriptor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135179101 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -178,13 +294,15 @@ class AggregationCodeGenerator( | ${aggMapping(i)}, | (${accTypes(i)}) accs.getField($i));""".stripMargin } else { +val setDataView = genDataViewFieldSetter(s"acc$i", i) j""" |org.apache.flink.table.functions.AggregateFunction baseClass$i = | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; - | + |${accTypes(i)} acc$i = (${accTypes(i)}) accs.getField($i); + |$setDataView --- End diff -- yes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135173258 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1469,4 +1526,26 @@ object AggregateUtil { private def gcd(a: Long, b: Long): Long = { if (b == 0) a else gcd(b, a % b) } + + @throws[Exception] + def serialize(stateDescriptor: StateDescriptor[_, _]): String = { +val byteArray = InstantiationUtil.serializeObject(stateDescriptor) +Base64.encodeBase64URLSafeString(byteArray) + } + + @throws[Exception] + def deserialize(data: String): StateDescriptor[_, _] = { +val byteData = Base64.decodeBase64(data) +InstantiationUtil.deserializeObject[StateDescriptor[_, _]]( + byteData, + Thread.currentThread.getContextClassLoader) + } + + def createDataViewTerm(aggIndex: Int, fieldName: String): String = { +s"acc${aggIndex}_${fieldName}_dataview" + } } + +case class DataViewConfig(accSpecs: Array[Seq[DataViewSpec[_]]], isStateBackedDataViews: Boolean) --- End diff -- yes, this class not need now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135172148 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -759,10 +786,12 @@ object AggregateUtil { : GroupCombineFunction[Row, Row] = { val needRetract = false -val (aggFieldIndexes, aggregates, accTypes) = transformToAggregateFunctions( +val isStateBackedDataViews = false +val (aggFieldIndexes, aggregates, accTypes, accSpecs) = transformToAggregateFunctions( --- End diff -- it make sense --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135171980 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -356,6 +486,9 @@ class AggregationCodeGenerator( """.stripMargin if (needMerge) { +if (accConfig != null && accConfig.isStateBackedDataViews) { + throw new CodeGenException("DataView doesn't support merge when the backend uses state.") --- End diff -- I found the exception will be throw in everywhere call generateAggregations(..). I think we can print funcName in exception info. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135163096 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java --- @@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int iValue, int iWeight) { accumulator.count -= iWeight; } } + + /** +* CountDistinct accumulator. +*/ + public static class CountDistinctAccum { + public MapView map; + public long count; + } + + /** +* CountDistinct aggregate. +*/ + public static class CountDistinct extends AggregateFunction { + + @Override + public CountDistinctAccum createAccumulator() { + CountDistinctAccum accum = new CountDistinctAccum(); + accum.map = new MapView<>(Types.STRING, Types.INT); + accum.count = 0L; + return accum; + } + + //Overloaded accumulate method + public void accumulate(CountDistinctAccum accumulator, String id) { + try { + if (!accumulator.map.contains(id)) { + accumulator.map.put(id, 1); + accumulator.count += 1; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + //Overloaded accumulate method + public void accumulate(CountDistinctAccum accumulator, long id) { + try { + if (!accumulator.map.contains(String.valueOf(id))) { + accumulator.map.put(String.valueOf(id), 1); + accumulator.count += 1; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public Long getValue(CountDistinctAccum accumulator) { + return accumulator.count; + } + } + + /** +* CountDistinct aggregate with merge. +*/ + public static class CountDistinctWithMerge extends CountDistinct { + + //Overloaded merge method + public void merge(CountDistinctAccum acc, Iterable it) { + Iterator iter = it.iterator(); + while (iter.hasNext()) { + CountDistinctAccum mergeAcc = iter.next(); + acc.count += mergeAcc.count; + + try { + Iterator mapItr = mergeAcc.map.keys().iterator(); + while (mapItr.hasNext()) { + String key = mapItr.next(); + if (!acc.map.contains(key)) { + acc.map.put(key, 1); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + /** +* CountDistinct aggregate with merge and reset. +*/ + public static class CountDistinctWithMergeAndReset extends CountDistinctWithMerge { + + //Overloaded retract method + public void resetAccumulator(CountDistinctAccum acc) { + acc.map.clear(); + acc.count = 0; + } + } + + /** +* CountDistinct aggregate with retract. +*/ + public static class CountDistinctWithRetractAndReset extends CountDistinct { + + //Overloaded retract method + public void retract(CountDistinctAccum accumulator, long id) { + try { + if (!accumulator.map.contains(String.valueOf(id))) { + accumulator.map.remove(String.valueOf(id)); + accumulator.count -= 1; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + //Overloaded retract method + public void resetAccumulator(CountDistinctAccum acc) { + acc.map.clear();
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135162070 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1469,4 +1526,26 @@ object AggregateUtil { private def gcd(a: Long, b: Long): Long = { if (b == 0) a else gcd(b, a % b) } + + @throws[Exception] + def serialize(stateDescriptor: StateDescriptor[_, _]): String = { +val byteArray = InstantiationUtil.serializeObject(stateDescriptor) +Base64.encodeBase64URLSafeString(byteArray) + } + + @throws[Exception] + def deserialize(data: String): StateDescriptor[_, _] = { +val byteData = Base64.decodeBase64(data) +InstantiationUtil.deserializeObject[StateDescriptor[_, _]]( + byteData, + Thread.currentThread.getContextClassLoader) + } + + def createDataViewTerm(aggIndex: Int, fieldName: String): String = { --- End diff -- yes, just the omission of code refactoring --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135112454 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import java.util + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{MapSerializer, MapSerializerConfigSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.MapView + +/** + * A serializer for [[MapView]]. The serializer relies on a key serializer and a value + * serializer for the serialization of the map's key-value pairs. + * + * The serialization format for the map is as follows: four bytes for the length of the map, + * followed by the serialized representation of each key-value pair. To allow null values, + * each value is prefixed by a null marker. + * + * @param mapSerializer Map serializer. + * @tparam K The type of the keys in the map. + * @tparam V The type of the values in the map. + */ +@Internal +class MapViewSerializer[K, V](val mapSerializer: MapSerializer[K, V]) + extends TypeSerializer[MapView[K, V]] { + + override def isImmutableType: Boolean = false + + override def duplicate(): TypeSerializer[MapView[K, V]] = +new MapViewSerializer[K, V]( + mapSerializer.duplicate().asInstanceOf[MapSerializer[K, V]]) + + override def createInstance(): MapView[K, V] = { +val mapview = new MapView[K, V] +mapview.putAll(mapSerializer.createInstance()) --- End diff -- No need to add anything to the new instance. The map should be empty. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135128323 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java --- @@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int iValue, int iWeight) { accumulator.count -= iWeight; } } + + /** +* CountDistinct accumulator. +*/ + public static class CountDistinctAccum { + public MapView map; + public long count; + } + + /** +* CountDistinct aggregate. +*/ + public static class CountDistinct extends AggregateFunction { + + @Override + public CountDistinctAccum createAccumulator() { + CountDistinctAccum accum = new CountDistinctAccum(); + accum.map = new MapView<>(Types.STRING, Types.INT); + accum.count = 0L; + return accum; + } + + //Overloaded accumulate method + public void accumulate(CountDistinctAccum accumulator, String id) { + try { + if (!accumulator.map.contains(id)) { + accumulator.map.put(id, 1); + accumulator.count += 1; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + //Overloaded accumulate method + public void accumulate(CountDistinctAccum accumulator, long id) { + try { + if (!accumulator.map.contains(String.valueOf(id))) { + accumulator.map.put(String.valueOf(id), 1); + accumulator.count += 1; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public Long getValue(CountDistinctAccum accumulator) { + return accumulator.count; + } + } + + /** +* CountDistinct aggregate with merge. +*/ + public static class CountDistinctWithMerge extends CountDistinct { + + //Overloaded merge method + public void merge(CountDistinctAccum acc, Iterable it) { + Iterator iter = it.iterator(); + while (iter.hasNext()) { + CountDistinctAccum mergeAcc = iter.next(); + acc.count += mergeAcc.count; + + try { + Iterator mapItr = mergeAcc.map.keys().iterator(); + while (mapItr.hasNext()) { + String key = mapItr.next(); + if (!acc.map.contains(key)) { + acc.map.put(key, 1); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + /** +* CountDistinct aggregate with merge and reset. +*/ + public static class CountDistinctWithMergeAndReset extends CountDistinctWithMerge { + + //Overloaded retract method + public void resetAccumulator(CountDistinctAccum acc) { + acc.map.clear(); + acc.count = 0; + } + } + + /** +* CountDistinct aggregate with retract. +*/ + public static class CountDistinctWithRetractAndReset extends CountDistinct { + + //Overloaded retract method + public void retract(CountDistinctAccum accumulator, long id) { + try { + if (!accumulator.map.contains(String.valueOf(id))) { + accumulator.map.remove(String.valueOf(id)); --- End diff -- shouldn't a count distinct with retraction increment the counter value in the MapView in `accumulate` and decrement the counter in `retract`? Only when the counter reaches 0, the map entry should be removed and `accumulator.count` be decremented. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project d
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135140565 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.reflect.Field + +import org.apache.flink.api.common.state.{ListStateDescriptor, MapStateDescriptor, StateDescriptor} +import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo} + +/** + * Data view specification. + * + * @tparam ACC type extends [[DataView]] + */ +trait DataViewSpec[ACC <: DataView] { + def id: String + def field: Field + def toStateDescriptor: StateDescriptor[_, _] --- End diff -- Do we need this method? State descriptors can be initatiated by generated code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135126762 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala --- @@ -100,6 +108,17 @@ abstract class GeneratedAggregations extends Function { * aggregated results */ def resetAccumulator(accumulators: Row) + + /** +* Cleanup for the accumulators. +*/ + def cleanup() + + /** +* Tear-down method for [[org.apache.flink.table.functions.AggregateFunction]]. +* It can be used for clean up work. By default, this method does nothing. +*/ + def close() --- End diff -- I think `close()` is never called. So we can remove it, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135109036 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import java.util + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[ListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the lost, --- End diff -- lost -> list, remove the `` tag --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135121779 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1469,4 +1526,26 @@ object AggregateUtil { private def gcd(a: Long, b: Long): Long = { if (b == 0) a else gcd(b, a % b) } + + @throws[Exception] + def serialize(stateDescriptor: StateDescriptor[_, _]): String = { +val byteArray = InstantiationUtil.serializeObject(stateDescriptor) +Base64.encodeBase64URLSafeString(byteArray) + } + + @throws[Exception] + def deserialize(data: String): StateDescriptor[_, _] = { +val byteData = Base64.decodeBase64(data) +InstantiationUtil.deserializeObject[StateDescriptor[_, _]]( + byteData, + Thread.currentThread.getContextClassLoader) + } + + def createDataViewTerm(aggIndex: Int, fieldName: String): String = { +s"acc${aggIndex}_${fieldName}_dataview" + } } + +case class DataViewConfig(accSpecs: Array[Seq[DataViewSpec[_]]], isStateBackedDataViews: Boolean) --- End diff -- We only need a `DataViewConfig` if the views are backed by state right? So we can remove the `boolean` `isStateBackedDataViews` field and make it an optional parameter of the code generator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135112753 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.MapSerializer +import org.apache.flink.table.api.dataview.MapView + +/** + * [[MapView]] type information. + * + * @param keyType key type information + * @param valueType value type information + * @tparam K key type + * @tparam V value type + */ +@PublicEvolving --- End diff -- Please remove the annotation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135125473 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1469,4 +1526,26 @@ object AggregateUtil { private def gcd(a: Long, b: Long): Long = { if (b == 0) a else gcd(b, a % b) } + + @throws[Exception] + def serialize(stateDescriptor: StateDescriptor[_, _]): String = { +val byteArray = InstantiationUtil.serializeObject(stateDescriptor) +Base64.encodeBase64URLSafeString(byteArray) + } + + @throws[Exception] + def deserialize(data: String): StateDescriptor[_, _] = { +val byteData = Base64.decodeBase64(data) +InstantiationUtil.deserializeObject[StateDescriptor[_, _]]( + byteData, + Thread.currentThread.getContextClassLoader) + } + + def createDataViewTerm(aggIndex: Int, fieldName: String): String = { --- End diff -- Move this method to `AggregationCodeGenerator`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135131075 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -79,13 +95,15 @@ class AggregationCodeGenerator( outputArity: Int, needRetract: Boolean, needMerge: Boolean, -needReset: Boolean) +needReset: Boolean, +accConfig: DataViewConfig) --- End diff -- make this an `Option[Array[Seq[DataViewSpec[_` with `None` default value? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135133385 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -161,13 +182,108 @@ class AggregationCodeGenerator( } } +/** + * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, + * close and member area of the generated function. + * + */ +def addReusableDataViews: Unit = { + if (accConfig != null && accConfig.isStateBackedDataViews) { +val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.accSpecs + .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) + .toMap[String, StateDescriptor[_, _]] + +for (i <- aggs.indices) yield { + for (spec <- accConfig.accSpecs(i)) yield { +val dataViewField = spec.field +val dataViewTypeTerm = dataViewField.getType.getCanonicalName +val desc = descMapping.getOrElse(spec.id, + throw new CodeGenException(s"Can not find ListView in accumulator by id: ${spec.id}")) --- End diff -- The code is not `ListView` specific. Change to `DataView`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135125391 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1469,4 +1526,26 @@ object AggregateUtil { private def gcd(a: Long, b: Long): Long = { if (b == 0) a else gcd(b, a % b) } + + @throws[Exception] + def serialize(stateDescriptor: StateDescriptor[_, _]): String = { --- End diff -- do we need this method? Instantiation of state descriptors can be code generated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135135021 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -161,13 +182,108 @@ class AggregationCodeGenerator( } } +/** + * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, + * close and member area of the generated function. + * + */ +def addReusableDataViews: Unit = { + if (accConfig != null && accConfig.isStateBackedDataViews) { +val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.accSpecs + .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) + .toMap[String, StateDescriptor[_, _]] + +for (i <- aggs.indices) yield { + for (spec <- accConfig.accSpecs(i)) yield { +val dataViewField = spec.field +val dataViewTypeTerm = dataViewField.getType.getCanonicalName +val desc = descMapping.getOrElse(spec.id, + throw new CodeGenException(s"Can not find ListView in accumulator by id: ${spec.id}")) + +// define the DataView variables +val serializedData = AggregateUtil.serialize(desc) +val dataViewFieldTerm = AggregateUtil.createDataViewTerm(i, dataViewField.getName) +val field = + s""" + |transient $dataViewTypeTerm $dataViewFieldTerm = null; + |""".stripMargin +reusableMemberStatements.add(field) + +// create DataViews +val descFieldTerm = s"${dataViewFieldTerm}_desc" +val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName +val descDeserialize = + s""" + |$descClassQualifier $descFieldTerm = ($descClassQualifier) + | ${AggregateUtil.getClass.getName.stripSuffix("$")} + | .deserialize("$serializedData"); + """.stripMargin +val createDataView = if (dataViewField.getType == classOf[MapView[_, _]]) { + s""" + |$descDeserialize + |$dataViewFieldTerm = new org.apache.flink.table.dataview.StateMapView( + | $contextTerm.getMapState(( + | org.apache.flink.api.common.state.MapStateDescriptor)$descFieldTerm)); + """.stripMargin --- End diff -- indent --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135134452 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -161,13 +182,108 @@ class AggregationCodeGenerator( } } +/** + * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, + * close and member area of the generated function. + * + */ +def addReusableDataViews: Unit = { + if (accConfig != null && accConfig.isStateBackedDataViews) { +val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.accSpecs + .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) + .toMap[String, StateDescriptor[_, _]] + +for (i <- aggs.indices) yield { + for (spec <- accConfig.accSpecs(i)) yield { +val dataViewField = spec.field +val dataViewTypeTerm = dataViewField.getType.getCanonicalName +val desc = descMapping.getOrElse(spec.id, + throw new CodeGenException(s"Can not find ListView in accumulator by id: ${spec.id}")) + +// define the DataView variables +val serializedData = AggregateUtil.serialize(desc) --- End diff -- Why do we need to serialize and deserialize state descriptors? Can't we just generate the code to instantiate them? IMO, that would be more straightforward and easier to debug. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135110707 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import java.util + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[ListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the lost, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +@Internal +class ListViewSerializer[T](val listSerializer: ListSerializer[T]) + extends TypeSerializer[ListView[T]] { + + override def isImmutableType: Boolean = false + + override def duplicate(): TypeSerializer[ListView[T]] = { +new ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]]) + } + + override def createInstance(): ListView[T] = { +val listview = new ListView[T] +listview.addAll(listSerializer.createInstance()) +listview + } + + override def copy(from: ListView[T]): ListView[T] = { +val listview = new ListView[T] +listview.addAll(listSerializer.copy(from.list)) +listview + } + + override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = copy(from) + + override def getLength: Int = -1 + + override def serialize(record: ListView[T], target: DataOutputView): Unit = { +listSerializer.serialize(record.list, target) + } + + override def deserialize(source: DataInputView): ListView[T] = { +val listview = new ListView[T] +listview.addAll(listSerializer.deserialize(source)) --- End diff -- same as before. We could directly set the list. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135125423 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1469,4 +1526,26 @@ object AggregateUtil { private def gcd(a: Long, b: Long): Long = { if (b == 0) a else gcd(b, a % b) } + + @throws[Exception] + def serialize(stateDescriptor: StateDescriptor[_, _]): String = { +val byteArray = InstantiationUtil.serializeObject(stateDescriptor) +Base64.encodeBase64URLSafeString(byteArray) + } + + @throws[Exception] + def deserialize(data: String): StateDescriptor[_, _] = { --- End diff -- do we need this method? Instantiation of state descriptors can be code generated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135124865 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1398,14 +1440,29 @@ object AggregateUtil { } } +val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size) + // create accumulator type information for every aggregate function aggregates.zipWithIndex.foreach { case (agg, index) => - if (null == accTypes(index)) { + if (accTypes(index) != null) { +val (accType, specs) = removeStateViewFieldsFromAccTypeInfo(index, + agg, + accTypes(index), + isStateBackedDataViews) +if (specs.isDefined) { + accSpecs(index) = specs.get + accTypes(index) = accType +} else { + accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg) --- End diff -- use same order as above: ``` accSpecs(index) = ... accTypes(index) = ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135112662 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import java.util + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{MapSerializer, MapSerializerConfigSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.MapView + +/** + * A serializer for [[MapView]]. The serializer relies on a key serializer and a value + * serializer for the serialization of the map's key-value pairs. + * + * The serialization format for the map is as follows: four bytes for the length of the map, + * followed by the serialized representation of each key-value pair. To allow null values, + * each value is prefixed by a null marker. + * + * @param mapSerializer Map serializer. + * @tparam K The type of the keys in the map. + * @tparam V The type of the values in the map. + */ +@Internal +class MapViewSerializer[K, V](val mapSerializer: MapSerializer[K, V]) + extends TypeSerializer[MapView[K, V]] { + + override def isImmutableType: Boolean = false + + override def duplicate(): TypeSerializer[MapView[K, V]] = +new MapViewSerializer[K, V]( + mapSerializer.duplicate().asInstanceOf[MapSerializer[K, V]]) + + override def createInstance(): MapView[K, V] = { +val mapview = new MapView[K, V] +mapview.putAll(mapSerializer.createInstance()) +mapview + } + + override def copy(from: MapView[K, V]): MapView[K, V] = { +val mapview = new MapView[K, V] +mapview.putAll(mapSerializer.copy(from.map)) --- End diff -- Same as for `ListStateSerializer`. Copying one list into another is more expensive than just replacing the list instance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135109902 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import java.util + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[ListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the lost, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +@Internal +class ListViewSerializer[T](val listSerializer: ListSerializer[T]) + extends TypeSerializer[ListView[T]] { + + override def isImmutableType: Boolean = false + + override def duplicate(): TypeSerializer[ListView[T]] = { +new ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]]) + } + + override def createInstance(): ListView[T] = { +val listview = new ListView[T] +listview.addAll(listSerializer.createInstance()) --- End diff -- I think this call is not necessary. The new list instance is empty, so nothing is added. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135117112 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -307,6 +312,139 @@ object UserDefinedFunctionUtils { // -- /** +* Analyze the constructor to get the type information of the MapView or ListView type variables +* inside the accumulate. +* +* @param aggFun aggregate function +* @return the data view specification +*/ + def getDataViewTypeInfoFromConstructor( +aggFun: AggregateFunction[_, _]) + : mutable.HashMap[String, TypeInformation[_]] = { + +val resultMap = new mutable.HashMap[String, TypeInformation[_]] +val acc = aggFun.createAccumulator() +val fields: util.List[Field] = TypeExtractor.getAllDeclaredFields(acc.getClass, true) +for (i <- 0 until fields.size()) { + val field = fields.get(i) + field.setAccessible(true) + if (classOf[DataView].isAssignableFrom(field.getType)) { +if (field.getType == classOf[MapView[_, _]]) { + val mapView = field.get(acc).asInstanceOf[MapView[_, _]] + if (mapView != null) { +val keyTypeInfo = mapView.keyTypeInfo +val valueTypeInfo = mapView.valueTypeInfo + +if (keyTypeInfo != null && valueTypeInfo != null) { + resultMap.put(field.getName, new MapViewTypeInfo(keyTypeInfo, valueTypeInfo)) +} + } else { +resultMap.put(field.getName, null) + } +} else if (field.getType == classOf[ListView[_]]) { + val listView = field.get(acc).asInstanceOf[ListView[_]] + val elementTypeInfo = listView.elementTypeInfo + + if (elementTypeInfo != null) { +resultMap.put(field.getName, new ListViewTypeInfo(elementTypeInfo)) + } +} + } +} + +resultMap + } + + /** +* Remove StateView fields from accumulator type information. +* +* @param index index of aggregate function +* @param aggFun aggregate function +* @param accType accumulator type information, only support pojo type +* @param isStateBackedDataViews is data views use state backend +* @return mapping of accumulator type information and data view config which contains id, +* field name and state descriptor +*/ + def removeStateViewFieldsFromAccTypeInfo( +index: Int, +aggFun: AggregateFunction[_, _], +accType: TypeInformation[_], +isStateBackedDataViews: Boolean) + : (TypeInformation[_], Option[Seq[DataViewSpec[_]]]) = { + +var hasDataView = false +val acc = aggFun.createAccumulator() +accType match { + case pojoType: PojoTypeInfo[_] if pojoType.getArity > 0 => +val arity = pojoType.getArity +val newPojoFields = new util.ArrayList[PojoField]() +val accumulatorSpecs = new mutable.ArrayBuffer[DataViewSpec[_]] +for (i <- 0 until arity) { + val pojoField = pojoType.getPojoFieldAt(i) + val field = pojoField.getField + val fieldName = field.getName + field.setAccessible(true) + + pojoField.getTypeInformation match { +case map: MapViewTypeInfo[Any, Any] => + val mapView = field.get(acc).asInstanceOf[MapView[_, _]] + if (mapView != null) { +val keyTypeInfo = mapView.keyTypeInfo +val valueTypeInfo = mapView.valueTypeInfo +val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo != null) { + hasDataView = true + new MapViewTypeInfo(keyTypeInfo, valueTypeInfo) +} else { + map +} + +var spec = MapViewSpec( + "agg" + index + "$" + fieldName, // generate unique name to be used as state name + field, + newTypeInfo) + +accumulatorSpecs += spec +if (!isStateBackedDataViews) { // add data view field which not use state backend + newPojoFields.add(new PojoField(field, newTypeInfo)) +} + } + +case list: ListViewTypeInfo[Any] => + val listView = field.get(acc).asInstanceOf[ListView[_]] + if (listView != null) { +val elementTypeInfo = listView.elementTypeInfo +
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135135051 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -161,13 +182,108 @@ class AggregationCodeGenerator( } } +/** + * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, + * close and member area of the generated function. + * + */ +def addReusableDataViews: Unit = { + if (accConfig != null && accConfig.isStateBackedDataViews) { +val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.accSpecs + .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) + .toMap[String, StateDescriptor[_, _]] + +for (i <- aggs.indices) yield { + for (spec <- accConfig.accSpecs(i)) yield { +val dataViewField = spec.field +val dataViewTypeTerm = dataViewField.getType.getCanonicalName +val desc = descMapping.getOrElse(spec.id, + throw new CodeGenException(s"Can not find ListView in accumulator by id: ${spec.id}")) + +// define the DataView variables +val serializedData = AggregateUtil.serialize(desc) +val dataViewFieldTerm = AggregateUtil.createDataViewTerm(i, dataViewField.getName) +val field = + s""" + |transient $dataViewTypeTerm $dataViewFieldTerm = null; + |""".stripMargin +reusableMemberStatements.add(field) + +// create DataViews +val descFieldTerm = s"${dataViewFieldTerm}_desc" +val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName +val descDeserialize = + s""" + |$descClassQualifier $descFieldTerm = ($descClassQualifier) + | ${AggregateUtil.getClass.getName.stripSuffix("$")} + | .deserialize("$serializedData"); + """.stripMargin +val createDataView = if (dataViewField.getType == classOf[MapView[_, _]]) { + s""" + |$descDeserialize + |$dataViewFieldTerm = new org.apache.flink.table.dataview.StateMapView( + | $contextTerm.getMapState(( + | org.apache.flink.api.common.state.MapStateDescriptor)$descFieldTerm)); + """.stripMargin +} else if (dataViewField.getType == classOf[ListView[_]]) { + s""" + |$descDeserialize + |$dataViewFieldTerm = new org.apache.flink.table.dataview.StateListView( + | $contextTerm.getListState(( + | org.apache.flink.api.common.state.ListStateDescriptor)$descFieldTerm)); + """.stripMargin --- End diff -- indent --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135134994 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -161,13 +182,108 @@ class AggregationCodeGenerator( } } +/** + * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] to the open, cleanup, + * close and member area of the generated function. + * + */ +def addReusableDataViews: Unit = { + if (accConfig != null && accConfig.isStateBackedDataViews) { +val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.accSpecs + .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) + .toMap[String, StateDescriptor[_, _]] + +for (i <- aggs.indices) yield { + for (spec <- accConfig.accSpecs(i)) yield { +val dataViewField = spec.field +val dataViewTypeTerm = dataViewField.getType.getCanonicalName +val desc = descMapping.getOrElse(spec.id, + throw new CodeGenException(s"Can not find ListView in accumulator by id: ${spec.id}")) + +// define the DataView variables +val serializedData = AggregateUtil.serialize(desc) +val dataViewFieldTerm = AggregateUtil.createDataViewTerm(i, dataViewField.getName) +val field = + s""" + |transient $dataViewTypeTerm $dataViewFieldTerm = null; + |""".stripMargin +reusableMemberStatements.add(field) + +// create DataViews +val descFieldTerm = s"${dataViewFieldTerm}_desc" +val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName +val descDeserialize = + s""" + |$descClassQualifier $descFieldTerm = ($descClassQualifier) + | ${AggregateUtil.getClass.getName.stripSuffix("$")} + | .deserialize("$serializedData"); + """.stripMargin --- End diff -- indent --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135112227 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import java.util + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[ListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the lost, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +@Internal --- End diff -- Please remove the annotation. we don't use the `@Internal`, `@Public` and `@PublicEvolving` annotations in the `flink-table` module. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135124134 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -759,10 +786,12 @@ object AggregateUtil { : GroupCombineFunction[Row, Row] = { val needRetract = false -val (aggFieldIndexes, aggregates, accTypes) = transformToAggregateFunctions( +val isStateBackedDataViews = false +val (aggFieldIndexes, aggregates, accTypes, accSpecs) = transformToAggregateFunctions( --- End diff -- `transformToAggregateFunctions` has a default value for `isStateBackedDataViews`. So we don't need to pass a parameter. Moreover, we only need to pass data view information to the code generator if the data views are backed by state (if we make it an optional parameter). So most of the changes here can be reverted (and also for all other operators that do not support state backed views). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135107609 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala --- @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.MapViewTypeInfoFactory + +/** + * MapView provides Map functionality for accumulators used by user-defined aggregate functions + * [[org.apache.flink.table.functions.AggregateFunction]]. + * + * A MapView can be backed by a Java HashMap or a state backend, depending on the context in + * which the function is used. + * + * At runtime `MapView` will be replaced by a [[org.apache.flink.table.dataview.StateMapView]] + * when use state backend. + * + * Example: + * {{{ + * + * public class MyAccum { + *public MapView map; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + *@Override + *public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.map = new MapView<>(Types.STRING, Types.INT); + * accum.count = 0L; + * return accum; + *} + * + *public void accumulate(MyAccum accumulator, String id) { + * try { + * if (!accumulator.map.contains(id)) { + *accumulator.map.put(id, 1); + *accumulator.count++; + * } + * } catch (Exception e) { + *e.printStackTrace(); + * } + *} + * + *@Override + *public Long getValue(MyAccum accumulator) { + * return accumulator.count; + *} + * } + * + * }}} + * + * @param keyTypeInfo key type information + * @param valueTypeInfo value type information + * @tparam K key type + * @tparam V value type + */ +@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]]) +class MapView[K, V]( +@transient private[flink] val keyTypeInfo: TypeInformation[K], +@transient private[flink] val valueTypeInfo: TypeInformation[V]) + extends DataView { + + def this() = this(null, null) + + private[flink] var map = new util.HashMap[K, V]() + + /** +* Returns the value to which the specified key is mapped, or { @code null } if this map +* contains no mapping for the key. +* +* @param key The key of the mapping. +* @return The value of the mapping with the given key. +* @throws Exception Thrown if the system cannot get data. +*/ + @throws[Exception] + def get(key: K): V = map.get(key) + + /** +* Put a value with the given key into the map. +* +* @param key The key of the mapping. +* @param value The new value of the mapping. +* @throws Exception Thrown if the system cannot put data. +*/ + @throws[Exception] + def put(key: K, value: V): Unit = map.put(key, value) + + /** +* Copies all of the mappings from the specified map to this map view. +* +* @param map The mappings to be stored in this map. +* @throws Exception Thrown if the system cannot access the map. +*/ + @throws[Exception] + def putAll(map: util.Map[K, V]): Unit = this.map.putAll(map) + + /** +* Deletes the mapping of the given key. +* +* @param key The key of the mapping. +* @throws Exception Thrown if the system cannot access the map. +*/ + @throws[Exception] + def remove(key: K): Unit = map.remove(key) + + /** +* Returns whether there exists the given mapping. +* +* @param key The
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135110511 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import java.util + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, ListSerializer} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.ListView + +/** + * A serializer for [[ListView]]. The serializer relies on an element + * serializer for the serialization of the list's elements. + * + * The serialization format for the list is as follows: four bytes for the length of the lost, + * followed by the serialized representation of each element. + * + * @param listSerializer List serializer. + * @tparam T The type of element in the list. + */ +@Internal +class ListViewSerializer[T](val listSerializer: ListSerializer[T]) + extends TypeSerializer[ListView[T]] { + + override def isImmutableType: Boolean = false + + override def duplicate(): TypeSerializer[ListView[T]] = { +new ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]]) + } + + override def createInstance(): ListView[T] = { +val listview = new ListView[T] +listview.addAll(listSerializer.createInstance()) +listview + } + + override def copy(from: ListView[T]): ListView[T] = { +val listview = new ListView[T] +listview.addAll(listSerializer.copy(from.list)) --- End diff -- `addAll()` adds overhead because all elements are copied from on list to the other. Can't the serializer have direct access to the list field of the `ListView` and replace set the list copy as the `ListView`'s list? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135112047 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import java.util + +import org.apache.flink.annotation.Internal +import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.common.typeutils.base.{MapSerializer, MapSerializerConfigSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.table.api.dataview.MapView + +/** + * A serializer for [[MapView]]. The serializer relies on a key serializer and a value + * serializer for the serialization of the map's key-value pairs. + * + * The serialization format for the map is as follows: four bytes for the length of the map, + * followed by the serialized representation of each key-value pair. To allow null values, + * each value is prefixed by a null marker. + * + * @param mapSerializer Map serializer. + * @tparam K The type of the keys in the map. + * @tparam V The type of the values in the map. + */ +@Internal --- End diff -- Please remove the annotation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135107708 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView provides List functionality for accumulators used by user-defined aggregate functions + * {{AggregateFunction}}. + * + * A ListView can be backed by a Java ArrayList or a state backend, depending on the context in + * which the function is used. + * + * At runtime `ListView` will be replaced by a [[org.apache.flink.table.dataview.StateListView]] + * when use state backend.. + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T]( +@transient private[flink] val elementTypeInfo: TypeInformation[T]) + extends DataView { + + def this() = this(null) + + private[flink] val list = new util.ArrayList[T]() + + /** +* Returns an iterable of the list. +* +* @throws Exception Thrown if the system cannot get data. +* @return The iterable of the list or { @code null} if the list is empty. +*/ + @throws[Exception] + def get: JIterable[T] = { +if (!list.isEmpty) { + list +} else { + null +} + } + + /** +* Adding the given value to the list. +* +* @throws Exception Thrown if the system cannot add data. +* @param value element to be appended to this list +*/ + @throws[Exception] + def add(value: T): Unit = list.add(value) + + /** +* Copies all of the elements from the specified list to this list view. +* +* @throws Exception Thrown if the system cannot add all data. +* @param list The list to be stored in this list view. +*/ + @throws[Exception] + def addAll(list: util.List[T]): Unit = this.list.addAll(list) + + /** +* Removes all of the elements from this list. +* +* The list will be empty after this call returns. +*/ + override def clear(): Unit = list.clear() + + override def equals(other: Any): Boolean = other match { +case that: ListView[_] => --- End diff -- `case that: ListView[T] =>`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135138596 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -178,13 +294,15 @@ class AggregationCodeGenerator( | ${aggMapping(i)}, | (${accTypes(i)}) accs.getField($i));""".stripMargin } else { +val setDataView = genDataViewFieldSetter(s"acc$i", i) j""" |org.apache.flink.table.functions.AggregateFunction baseClass$i = | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; - | + |${accTypes(i)} acc$i = (${accTypes(i)}) accs.getField($i); + |$setDataView --- End diff -- directly call `genDataViewFieldSetter(s"acc$i", i)` here? Would make it easier to follow the `acc$i` variables. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135139164 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -356,6 +486,9 @@ class AggregationCodeGenerator( """.stripMargin if (needMerge) { +if (accConfig != null && accConfig.isStateBackedDataViews) { + throw new CodeGenException("DataView doesn't support merge when the backend uses state.") --- End diff -- Can we throw this exception earlier (e.g., in `AggregateUtil`) and give more details about the aggregation function? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135128744 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java --- @@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int iValue, int iWeight) { accumulator.count -= iWeight; } } + + /** +* CountDistinct accumulator. +*/ + public static class CountDistinctAccum { + public MapView map; + public long count; + } + + /** +* CountDistinct aggregate. +*/ + public static class CountDistinct extends AggregateFunction { + + @Override + public CountDistinctAccum createAccumulator() { + CountDistinctAccum accum = new CountDistinctAccum(); + accum.map = new MapView<>(Types.STRING, Types.INT); + accum.count = 0L; + return accum; + } + + //Overloaded accumulate method + public void accumulate(CountDistinctAccum accumulator, String id) { + try { + if (!accumulator.map.contains(id)) { + accumulator.map.put(id, 1); + accumulator.count += 1; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + //Overloaded accumulate method + public void accumulate(CountDistinctAccum accumulator, long id) { + try { + if (!accumulator.map.contains(String.valueOf(id))) { + accumulator.map.put(String.valueOf(id), 1); + accumulator.count += 1; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public Long getValue(CountDistinctAccum accumulator) { + return accumulator.count; + } + } + + /** +* CountDistinct aggregate with merge. +*/ + public static class CountDistinctWithMerge extends CountDistinct { + + //Overloaded merge method + public void merge(CountDistinctAccum acc, Iterable it) { + Iterator iter = it.iterator(); + while (iter.hasNext()) { + CountDistinctAccum mergeAcc = iter.next(); + acc.count += mergeAcc.count; + + try { + Iterator mapItr = mergeAcc.map.keys().iterator(); + while (mapItr.hasNext()) { + String key = mapItr.next(); + if (!acc.map.contains(key)) { + acc.map.put(key, 1); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + /** +* CountDistinct aggregate with merge and reset. +*/ + public static class CountDistinctWithMergeAndReset extends CountDistinctWithMerge { + + //Overloaded retract method + public void resetAccumulator(CountDistinctAccum acc) { + acc.map.clear(); + acc.count = 0; + } + } + + /** +* CountDistinct aggregate with retract. +*/ + public static class CountDistinctWithRetractAndReset extends CountDistinct { + + //Overloaded retract method + public void retract(CountDistinctAccum accumulator, long id) { + try { + if (!accumulator.map.contains(String.valueOf(id))) { + accumulator.map.remove(String.valueOf(id)); + accumulator.count -= 1; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + //Overloaded retract method + public void resetAccumulator(CountDistinctAccum acc) { + acc.map.clear(); +
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135112266 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfo.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.ListSerializer +import org.apache.flink.table.api.dataview.ListView + +/** + * [[ListView]] type information. + * + * @param elementType element type information + * @tparam T element type + */ +@PublicEvolving --- End diff -- Please remove the annotation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135108408 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1458,9 +1459,10 @@ abstract class CodeGenerator( * Adds a reusable [[UserDefinedFunction]] to the member area of the generated [[Function]]. * * @param function [[UserDefinedFunction]] object to be instantiated during runtime +* @param contextTerm [[RuntimeContext]] term --- End diff -- `term to access the [[RuntimeContext]]` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135106244 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView provides List functionality for accumulators used by user-defined aggregate functions + * {{AggregateFunction}}. + * + * A ListView can be backed by a Java ArrayList or a state backend, depending on the context in + * which the function is used. + * + * At runtime `ListView` will be replaced by a [[org.apache.flink.table.dataview.StateListView]] + * when use state backend.. --- End diff -- `when use state backend..` -> `if it is backed by a state backend.` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135107137 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala --- @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.MapViewTypeInfoFactory + +/** + * MapView provides Map functionality for accumulators used by user-defined aggregate functions + * [[org.apache.flink.table.functions.AggregateFunction]]. + * + * A MapView can be backed by a Java HashMap or a state backend, depending on the context in + * which the function is used. + * + * At runtime `MapView` will be replaced by a [[org.apache.flink.table.dataview.StateMapView]] + * when use state backend. --- End diff -- `when use state backend` -> `if it is backed by a state backend.` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134653998 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1646,4 +1649,86 @@ abstract class CodeGenerator( fieldTerm } + + /** +* Adds a reusable class to the member area of the generated [[Function]]. +*/ + def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = { +val field = + s""" + |transient ${clazz.getCanonicalName} $fieldTerm = null; + |""".stripMargin +reusableMemberStatements.add(field) + } + + /** +* Adds a reusable [[DataViewConfig]] to the member area of the generated [[Function]]. +* +* @param indices indices of aggregate functions. +* @param ctxTerm field name of runtime context. +* @param accConfig data view config which contains id, field and StateDescriptos. +* @return statements to create [[MapView]] or [[ListView]]. +*/ + def addReusableDataViewConfig( + indices: Range, + ctxTerm: String, + accConfig: Option[DataViewConfig]) +: String = { +if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) { + val initDataViews = new StringBuilder + val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get.accSpecs +.flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) +.toMap[String, StateDescriptor[_, _]] + + for (i <- indices) yield { +for (spec <- accConfig.get.accSpecs(i)) yield { + val dataViewField = spec.field + val dataViewTypeTerm = dataViewField.getType.getCanonicalName + val desc = descMapping.getOrElse(spec.id, +throw new CodeGenException(s"Can not find ListView in accumulator by id: ${spec.id}")) + + val serializedData = AggregateUtil.serialize(desc) + val dataViewFieldTerm = s"acc${i}_${dataViewField.getName}_dataview" + val field = +s""" + |transient $dataViewTypeTerm $dataViewFieldTerm = null; + |""".stripMargin + reusableMemberStatements.add(field) + + val descFieldTerm = s"${dataViewFieldTerm}_desc" + val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName + val descDeserialize = +s""" + |$descClassQualifier $descFieldTerm = ($descClassQualifier) + | ${AggregateUtil.getClass.getName.stripSuffix("$")} + | .deserialize("$serializedData"); + """.stripMargin + + val init = if (dataViewField.getType == classOf[MapView[_, _]]) { +s""" + |$descDeserialize + |$dataViewFieldTerm = + | org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm, --- End diff -- Agree, It can be codegen now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134653928 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1646,4 +1649,86 @@ abstract class CodeGenerator( fieldTerm } + + /** +* Adds a reusable class to the member area of the generated [[Function]]. +*/ + def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = { +val field = + s""" + |transient ${clazz.getCanonicalName} $fieldTerm = null; + |""".stripMargin +reusableMemberStatements.add(field) + } + + /** +* Adds a reusable [[DataViewConfig]] to the member area of the generated [[Function]]. +* +* @param indices indices of aggregate functions. +* @param ctxTerm field name of runtime context. +* @param accConfig data view config which contains id, field and StateDescriptos. +* @return statements to create [[MapView]] or [[ListView]]. +*/ + def addReusableDataViewConfig( --- End diff -- Its a good idea, the code will be more clean after refactor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134653871 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1646,4 +1649,86 @@ abstract class CodeGenerator( fieldTerm } + + /** +* Adds a reusable class to the member area of the generated [[Function]]. +*/ + def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = { --- End diff -- Yes, RuntimeContext not need to be member var. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134135181 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -162,13 +172,66 @@ class AggregationCodeGenerator( } } +def genDataViewFieldSetter(accTerm: String, specs: Seq[DataViewSpec[_]]): String = { + if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) { +val setters = for (spec <- specs) yield { + val field = spec.field + val dataViewTerm = s"${accTerm}_${field.getName}_dataview" + val fieldSetter = if (Modifier.isPublic(field.getModifiers)) { +s"$accTerm.${field.getName} = $dataViewTerm;" + } else { +val fieldTerm = addReusablePrivateFieldAccess(field.getDeclaringClass, field.getName) +s"${reflectiveFieldWriteAccess(fieldTerm, field, accTerm, dataViewTerm)};" + } + + s""" + |$fieldSetter +""".stripMargin +} +setters.mkString("\n") + } else { +"" + } +} + +def genCleanUpDataView: String = { + if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) { +val cleanUpDataViews = new StringBuilder +for (i <- aggs.indices) yield { + val setters = for (spec <- accConfig.get.accSpecs(i)) yield { +val dataViewTerm = s"acc${i}_${spec.field.getName}_dataview" +val cleanUp = + s""" +|$dataViewTerm.clear(); + """.stripMargin +cleanUpDataViews.append(cleanUp) + } +} + +cleanUpDataViews.toString() + } else { +"" + } +} + +def genInitialize: String = { + +j""" + | public final void initialize( --- End diff -- I would like to rename the method name to `open(ctx)`. So that we can use the `reusableOpenStatements` and `reuseOpenCode()` of `CodeGenerator` to generate the content of `open`. Currently, the `genInitialize` is somewhat ambiguous to `reuseInitCode()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134151688 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/StateViewUtils.scala --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.dataview + +import java.util +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.api.common.state._ +import org.apache.flink.table.api.dataview.{ListView, MapView} + +/** + * State view utils to create [[StateListView]] or [[StateMapView]].. + */ +object StateViewUtils { --- End diff -- We may not need this, as we can code generate the creation code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134137708 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1646,4 +1649,86 @@ abstract class CodeGenerator( fieldTerm } + + /** +* Adds a reusable class to the member area of the generated [[Function]]. +*/ + def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = { +val field = + s""" + |transient ${clazz.getCanonicalName} $fieldTerm = null; + |""".stripMargin +reusableMemberStatements.add(field) + } + + /** +* Adds a reusable [[DataViewConfig]] to the member area of the generated [[Function]]. +* +* @param indices indices of aggregate functions. +* @param ctxTerm field name of runtime context. +* @param accConfig data view config which contains id, field and StateDescriptos. +* @return statements to create [[MapView]] or [[ListView]]. +*/ + def addReusableDataViewConfig( + indices: Range, + ctxTerm: String, + accConfig: Option[DataViewConfig]) +: String = { +if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) { + val initDataViews = new StringBuilder + val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get.accSpecs +.flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) +.toMap[String, StateDescriptor[_, _]] + + for (i <- indices) yield { +for (spec <- accConfig.get.accSpecs(i)) yield { + val dataViewField = spec.field + val dataViewTypeTerm = dataViewField.getType.getCanonicalName + val desc = descMapping.getOrElse(spec.id, +throw new CodeGenException(s"Can not find ListView in accumulator by id: ${spec.id}")) + + val serializedData = AggregateUtil.serialize(desc) + val dataViewFieldTerm = s"acc${i}_${dataViewField.getName}_dataview" + val field = +s""" + |transient $dataViewTypeTerm $dataViewFieldTerm = null; + |""".stripMargin + reusableMemberStatements.add(field) + + val descFieldTerm = s"${dataViewFieldTerm}_desc" + val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName + val descDeserialize = +s""" + |$descClassQualifier $descFieldTerm = ($descClassQualifier) + | ${AggregateUtil.getClass.getName.stripSuffix("$")} + | .deserialize("$serializedData"); + """.stripMargin + + val init = if (dataViewField.getType == classOf[MapView[_, _]]) { +s""" + |$descDeserialize + |$dataViewFieldTerm = + | org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm, + | $ctxTerm); + """.stripMargin + } else if (dataViewField.getType == classOf[ListView[_]]) { +s""" + |$descDeserialize + |$dataViewFieldTerm = + | org.apache.flink.table.dataview.StateViewUtils.createListView($descFieldTerm, --- End diff -- Same as above, we can code gen the creation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134134801 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala --- @@ -100,6 +107,11 @@ abstract class GeneratedAggregations extends Function { * aggregated results */ def resetAccumulator(accumulators: Row) + + /** +* Clean up for the accumulators. +*/ + def cleanUp() --- End diff -- cleanup is also a word, so we do not need a upper case `U` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134153349 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -80,7 +82,8 @@ class AggregationCodeGenerator( outputArity: Int, needRetract: Boolean, needMerge: Boolean, -needReset: Boolean) +needReset: Boolean, +accConfig: Option[DataViewConfig]) --- End diff -- It seems that `accConfig` is always `Some(x)`, do we need the `Option`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134118741 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala --- @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.MapViewTypeInfoFactory + +/** + * MapView provides Map functionality for accumulators used by user-defined aggregate functions + * [[org.apache.flink.table.functions.AggregateFunction]]. + * + * A MapView can be backed by a Java HashMap or a state backend, depending on the context in + * which the function is used. + * + * At runtime `MapView` will be replaced by a [[org.apache.flink.table.dataview.StateMapView]] + * when use state backend. + * + * Example: + * {{{ + * + * public class MyAccum { + *public MapView map; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + *@Override + *public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.map = new MapView<>(Types.STRING, Types.INT); + * accum.count = 0L; + * return accum; + *} + * + *public void accumulate(MyAccum accumulator, String id) { + * try { + * if (!accumulator.map.contains(id)) { + *accumulator.map.put(id, 1); + *accumulator.count++; + * } + * } catch (Exception e) { + *e.printStackTrace(); + * } + *} + * + *@Override + *public Long getValue(MyAccum accumulator) { + * return accumulator.count; + *} + * } + * + * }}} + * + * @param keyTypeInfo key type information + * @param valueTypeInfo value type information + * @tparam K key type + * @tparam V value type + */ +@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]]) +class MapView[K, V]( + private[flink] val keyTypeInfo: TypeInformation[K], + private[flink] val valueTypeInfo: TypeInformation[V]) --- End diff -- I'm not sure whether it is good to add `private[flink]`, because it is `public` for Java users actually. And please make them `@transient` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134118886 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -105,6 +108,13 @@ class AggregationCodeGenerator( inFields => for (f <- inFields) yield javaClasses(f) } +// define runtimeContext as member variable +val ctxTerm = s"runtimeContext" --- End diff -- I think we do not need to make the runtimeContext as a member variable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134138469 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1646,4 +1649,86 @@ abstract class CodeGenerator( fieldTerm } + + /** +* Adds a reusable class to the member area of the generated [[Function]]. +*/ + def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = { +val field = + s""" + |transient ${clazz.getCanonicalName} $fieldTerm = null; + |""".stripMargin +reusableMemberStatements.add(field) + } + + /** +* Adds a reusable [[DataViewConfig]] to the member area of the generated [[Function]]. +* +* @param indices indices of aggregate functions. +* @param ctxTerm field name of runtime context. +* @param accConfig data view config which contains id, field and StateDescriptos. +* @return statements to create [[MapView]] or [[ListView]]. +*/ + def addReusableDataViewConfig( --- End diff -- The `addReusableDataViewConfig` should be in `AggregateCodeGenerator`. And I would like to change this method to `addReusableDataView(spec: DataViewSpec): String`, the returned String is the dataview member variable term. And the dataview creation code can be added into `reusableDataViewStatements`. And the code of `reusableDataViewStatements` can be added to `initialize(ctx)` (or `open(ctx)` as I suggested) at last. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134151559 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView provides List functionality for accumulators used by user-defined aggregate functions + * {{AggregateFunction}}. + * + * A ListView can be backed by a Java ArrayList or a state backend, depending on the context in + * which the function is used. + * + * At runtime `ListView` will be replaced by a [[org.apache.flink.table.dataview.StateListView]] + * when use state backend.. + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T](val elementTypeInfo: TypeInformation[T]) extends DataView { + + def this() = this(null) + + val list = new util.ArrayList[T]() + + /** +* Returns an iterable of the list. +* +* @return The iterable of the list or { @code null} if the list is empty. +*/ + def get: JIterable[T] = { +if (!list.isEmpty) { + list +} else { + null +} + } + + /** +* Adding the given value to the list. +* +* @param value element to be appended to this list +*/ + def add(value: T): Unit = list.add(value) + + /** +* Removes all of the elements from this list. +* +* The list will be empty after this call returns. +*/ + override def clear(): Unit = list.clear() + + /** +* Copy from a list instance. +* +* @param t List instance. +* @return A copy of this list instance +*/ + def copyFrom(t: util.List[T]): ListView[T] = { --- End diff -- the `copyFrom` method can be accessed by users. I should avoid this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134139167 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1646,4 +1649,86 @@ abstract class CodeGenerator( fieldTerm } + + /** +* Adds a reusable class to the member area of the generated [[Function]]. +*/ + def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = { --- End diff -- Do we need this ? It only used to add `RuntimeContext` member area, but `RuntimeContext` is only used in `initialize`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134118759 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala --- @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.MapViewTypeInfoFactory + +/** + * MapView provides Map functionality for accumulators used by user-defined aggregate functions + * [[org.apache.flink.table.functions.AggregateFunction]]. + * + * A MapView can be backed by a Java HashMap or a state backend, depending on the context in + * which the function is used. + * + * At runtime `MapView` will be replaced by a [[org.apache.flink.table.dataview.StateMapView]] + * when use state backend. + * + * Example: + * {{{ + * + * public class MyAccum { + *public MapView map; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + *@Override + *public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.map = new MapView<>(Types.STRING, Types.INT); + * accum.count = 0L; + * return accum; + *} + * + *public void accumulate(MyAccum accumulator, String id) { + * try { + * if (!accumulator.map.contains(id)) { + *accumulator.map.put(id, 1); + *accumulator.count++; + * } + * } catch (Exception e) { + *e.printStackTrace(); + * } + *} + * + *@Override + *public Long getValue(MyAccum accumulator) { + * return accumulator.count; + *} + * } + * + * }}} + * + * @param keyTypeInfo key type information + * @param valueTypeInfo value type information + * @tparam K key type + * @tparam V value type + */ +@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]]) +class MapView[K, V]( + private[flink] val keyTypeInfo: TypeInformation[K], + private[flink] val valueTypeInfo: TypeInformation[V]) + extends DataView { + + def this() = this(null, null) + + val map = new util.HashMap[K, V]() + + /** +* Returns the value to which the specified key is mapped, or { @code null } if this map +* contains no mapping for the key. +* +* @param key The key of the mapping. +* @return The value of the mapping with the given key. +* @throws Exception Thrown if the system cannot get data. +*/ + @throws[Exception] + def get(key: K): V = map.get(key) + + /** +* Put a value with the given key into the map. +* +* @param key The key of the mapping. +* @param value The new value of the mapping. +* @throws Exception Thrown if the system cannot put data. +*/ + @throws[Exception] + def put(key: K, value: V): Unit = map.put(key, value) + + /** +* Copies all of the mappings from the specified map to this map view. +* +* @param map The mappings to be stored in this map. +* @throws Exception Thrown if the system cannot access the map. +*/ + @throws[Exception] + def putAll(map: util.Map[K, V]): Unit = map.putAll(map) + + /** +* Deletes the mapping of the given key. +* +* @param key The key of the mapping. +* @throws Exception Thrown if the system cannot access the map. +*/ + @throws[Exception] + def remove(key: K): Unit = map.remove(key) + + /** +* Returns whether there exists the given mapping. +* +* @param key The key of the mapping. +* @return True
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134137525 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1646,4 +1649,86 @@ abstract class CodeGenerator( fieldTerm } + + /** +* Adds a reusable class to the member area of the generated [[Function]]. +*/ + def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = { +val field = + s""" + |transient ${clazz.getCanonicalName} $fieldTerm = null; + |""".stripMargin +reusableMemberStatements.add(field) + } + + /** +* Adds a reusable [[DataViewConfig]] to the member area of the generated [[Function]]. +* +* @param indices indices of aggregate functions. +* @param ctxTerm field name of runtime context. +* @param accConfig data view config which contains id, field and StateDescriptos. +* @return statements to create [[MapView]] or [[ListView]]. +*/ + def addReusableDataViewConfig( + indices: Range, + ctxTerm: String, + accConfig: Option[DataViewConfig]) +: String = { +if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) { + val initDataViews = new StringBuilder + val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get.accSpecs +.flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) +.toMap[String, StateDescriptor[_, _]] + + for (i <- indices) yield { +for (spec <- accConfig.get.accSpecs(i)) yield { + val dataViewField = spec.field + val dataViewTypeTerm = dataViewField.getType.getCanonicalName + val desc = descMapping.getOrElse(spec.id, +throw new CodeGenException(s"Can not find ListView in accumulator by id: ${spec.id}")) + + val serializedData = AggregateUtil.serialize(desc) + val dataViewFieldTerm = s"acc${i}_${dataViewField.getName}_dataview" + val field = +s""" + |transient $dataViewTypeTerm $dataViewFieldTerm = null; + |""".stripMargin + reusableMemberStatements.add(field) + + val descFieldTerm = s"${dataViewFieldTerm}_desc" + val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName + val descDeserialize = +s""" + |$descClassQualifier $descFieldTerm = ($descClassQualifier) + | ${AggregateUtil.getClass.getName.stripSuffix("$")} + | .deserialize("$serializedData"); + """.stripMargin + + val init = if (dataViewField.getType == classOf[MapView[_, _]]) { +s""" + |$descDeserialize + |$dataViewFieldTerm = + | org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm, --- End diff -- I think we do not need the `StateViewUtils` here, we can create a MapView using code gen directly, because we already have the RuntimeContext and StateDescriptor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134118408 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView provides List functionality for accumulators used by user-defined aggregate functions + * {{AggregateFunction}}. + * + * A ListView can be backed by a Java ArrayList or a state backend, depending on the context in + * which the function is used. + * + * At runtime `ListView` will be replaced by a [[org.apache.flink.table.dataview.StateListView]] + * when use state backend.. + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T](val elementTypeInfo: TypeInformation[T]) extends DataView { + + def this() = this(null) + + val list = new util.ArrayList[T]() --- End diff -- make the list private, other wise Java users can access it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134118959 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -162,13 +172,66 @@ class AggregationCodeGenerator( } } +def genDataViewFieldSetter(accTerm: String, specs: Seq[DataViewSpec[_]]): String = { + if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) { +val setters = for (spec <- specs) yield { + val field = spec.field + val dataViewTerm = s"${accTerm}_${field.getName}_dataview" + val fieldSetter = if (Modifier.isPublic(field.getModifiers)) { +s"$accTerm.${field.getName} = $dataViewTerm;" + } else { +val fieldTerm = addReusablePrivateFieldAccess(field.getDeclaringClass, field.getName) +s"${reflectiveFieldWriteAccess(fieldTerm, field, accTerm, dataViewTerm)};" + } + + s""" + |$fieldSetter +""".stripMargin --- End diff -- indent --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134152736 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -307,6 +312,119 @@ object UserDefinedFunctionUtils { // -- /** +* get data view type information from accumulator constructor. +* +* @param aggFun aggregate function +* @return the data view specification +*/ + def getDataViewTypeInfoFromConstructor( +aggFun: AggregateFunction[_, _]) + : mutable.HashMap[String, TypeInformation[_]] = { + +val resultMap = new mutable.HashMap[String, TypeInformation[_]] +val acc = aggFun.createAccumulator() +val fields: util.List[Field] = TypeExtractor.getAllDeclaredFields(acc.getClass, true) --- End diff -- Yes, I think we can use accumulator type info instead of field information. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134118851 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * //Overloaded accumulate method + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T](val elementTypeInfo: TypeInformation[T]) extends DataView { --- End diff -- Please make the elementTypeInfo as `@transient`, and do we want the type info to be accessed by users? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134119014 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -162,13 +172,66 @@ class AggregationCodeGenerator( } } +def genDataViewFieldSetter(accTerm: String, specs: Seq[DataViewSpec[_]]): String = { + if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) { +val setters = for (spec <- specs) yield { + val field = spec.field + val dataViewTerm = s"${accTerm}_${field.getName}_dataview" --- End diff -- I find that the dataview term is defined in many place, can we create a method to generate the term name? such as `createDataViewTerm(index: Int, fieldName: String)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134118744 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala --- @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} +import java.util + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.MapViewTypeInfoFactory + +/** + * MapView provides Map functionality for accumulators used by user-defined aggregate functions + * [[org.apache.flink.table.functions.AggregateFunction]]. + * + * A MapView can be backed by a Java HashMap or a state backend, depending on the context in + * which the function is used. + * + * At runtime `MapView` will be replaced by a [[org.apache.flink.table.dataview.StateMapView]] + * when use state backend. + * + * Example: + * {{{ + * + * public class MyAccum { + *public MapView map; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + *@Override + *public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.map = new MapView<>(Types.STRING, Types.INT); + * accum.count = 0L; + * return accum; + *} + * + *public void accumulate(MyAccum accumulator, String id) { + * try { + * if (!accumulator.map.contains(id)) { + *accumulator.map.put(id, 1); + *accumulator.count++; + * } + * } catch (Exception e) { + *e.printStackTrace(); + * } + *} + * + *@Override + *public Long getValue(MyAccum accumulator) { + * return accumulator.count; + *} + * } + * + * }}} + * + * @param keyTypeInfo key type information + * @param valueTypeInfo value type information + * @tparam K key type + * @tparam V value type + */ +@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]]) +class MapView[K, V]( + private[flink] val keyTypeInfo: TypeInformation[K], + private[flink] val valueTypeInfo: TypeInformation[V]) + extends DataView { + + def this() = this(null, null) + + val map = new util.HashMap[K, V]() --- End diff -- please make the `map` private --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134118240 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -108,30 +112,38 @@ object AggregateUtil { outputArity, needRetract = false, needMerge = false, - needReset = false + needReset = false, + accConfig = Some(DataViewConfig(accSpecs, isUseState)) ) +val accConfig = accSpecs + .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) + .toMap[String, StateDescriptor[_, _]] + if (isRowTimeType) { if (isRowsClause) { // ROWS unbounded over process function new RowTimeUnboundedRowsOver( genFunction, aggregationStateType, CRowTypeInfo(inputTypeInfo), - queryConfig) + queryConfig, + accConfig) --- End diff -- +1 to do this. A great improvement! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134118203 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -307,6 +312,119 @@ object UserDefinedFunctionUtils { // -- /** +* get data view type information from accumulator constructor. +* +* @param aggFun aggregate function +* @return the data view specification +*/ + def getDataViewTypeInfoFromConstructor( +aggFun: AggregateFunction[_, _]) + : mutable.HashMap[String, TypeInformation[_]] = { + +val resultMap = new mutable.HashMap[String, TypeInformation[_]] +val acc = aggFun.createAccumulator() +val fields: util.List[Field] = TypeExtractor.getAllDeclaredFields(acc.getClass, true) +for (i <- 0 until fields.size()) { + val field = fields.get(i) + field.setAccessible(true) + if (classOf[DataView].isAssignableFrom(field.getType)) { +if (field.getType == classOf[MapView[_, _]]) { + val mapView = field.get(acc) + val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, "keyTypeInfo") --- End diff -- Add `private[flink]` to key and value type infos will make them public in Java. I'm not sure whether it is a good idea. I would like to not expose them to users (Java users), and the reflection only happens in compile which is fine I think. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r133925176 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -179,13 +214,19 @@ class AggregationCodeGenerator( | ${aggMapping(i)}, | (${accTypes(i)}) accs.getField($i));""".stripMargin } else { +val setDataView = if (accConfig.isDefined && accConfig.get.isUseState) { --- End diff -- Yes, it make sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r133925097 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -405,6 +481,17 @@ class AggregationCodeGenerator( } } +val aggFuncCode = Seq( + genSetAggregationResults, + genAccumulate, + genRetract, + genCreateAccumulators, + genSetForwardedFields, + genSetConstantFlags, + genCreateOutputRow, + genMergeAccumulatorsPair, + genResetAccumulator).mkString("\n") --- End diff -- It make senseï¼I have look at ProcessFunctionWithCleanupStateï¼the cleanUp should be called whenever cleanupState is called. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r133924725 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java --- @@ -135,4 +138,172 @@ public void retract(WeightedAvgAccum accumulator, int iValue, int iWeight) { accumulator.count -= iWeight; } } + + /** +* CountDistinct accumulator. +*/ + public static class CountDistinctAccum { + public MapView map; + public long count; + } + + /** +* CountDistinct aggregate. +*/ + public static class CountDistinct extends AggregateFunction { --- End diff -- Yes, CountDistinct is just used for test case here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130655818 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * //Overloaded accumulate method + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T](val elementTypeInfo: TypeInformation[T]) extends DataView { + + def this() = this(null) + + /** +* Returns an iterable of the list. +* +* @return The iterable of the list or { @code null} if the list is empty. +*/ + def get: JIterable[T] = throw new UnsupportedOperationException("Unsupported operation!") --- End diff -- We try to implement public interfaces in a way that Java users do not need to deal with Scala classes. However, it should be fine in this case because 1. Scala's `UnsupportedOperationException` is defined as Java's `UnsupportedOperationException` 2. the exception will be removed if we implement the method by default as backed by an ArrayList. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130655010 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * //Overloaded accumulate method + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T](val elementTypeInfo: TypeInformation[T]) extends DataView { --- End diff -- Hmm, not sure how many users would actually look into the definition of `ListView` and also recognize the interface as a state interface. Such users would probably also see the Java/ScalaDocs that explain the purpose of the class. Anyway, I have no strong opinion about this and I'm OK to keep it as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130653790 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * //Overloaded accumulate method + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) --- End diff -- OK, then let's keep it as it is --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130628208 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * //Overloaded accumulate method + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T](val elementTypeInfo: TypeInformation[T]) extends DataView { + + def this() = this(null) + + /** +* Returns an iterable of the list. +* +* @return The iterable of the list or { @code null} if the list is empty. +*/ + def get: JIterable[T] = throw new UnsupportedOperationException("Unsupported operation!") + + /** +* Adding the given value to the list. +* +* @param value element to be appended to this list +*/ + def add(value: T): Unit = throw new UnsupportedOperationException("Unsupported operation!") --- End diff -- I think its a good idea and I will try it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130625494 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -307,6 +312,119 @@ object UserDefinedFunctionUtils { // -- /** +* get data view type information from accumulator constructor. +* +* @param aggFun aggregate function +* @return the data view specification +*/ + def getDataViewTypeInfoFromConstructor( +aggFun: AggregateFunction[_, _]) + : mutable.HashMap[String, TypeInformation[_]] = { + +val resultMap = new mutable.HashMap[String, TypeInformation[_]] +val acc = aggFun.createAccumulator() +val fields: util.List[Field] = TypeExtractor.getAllDeclaredFields(acc.getClass, true) +for (i <- 0 until fields.size()) { + val field = fields.get(i) + field.setAccessible(true) + if (classOf[DataView].isAssignableFrom(field.getType)) { +if (field.getType == classOf[MapView[_, _]]) { + val mapView = field.get(acc) + val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, "keyTypeInfo") --- End diff -- yesï¼its a good idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130612699 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * //Overloaded accumulate method + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T](val elementTypeInfo: TypeInformation[T]) extends DataView { + + def this() = this(null) + + /** +* Returns an iterable of the list. +* +* @return The iterable of the list or { @code null} if the list is empty. +*/ + def get: JIterable[T] = throw new UnsupportedOperationException("Unsupported operation!") --- End diff -- There is no reference to the java.lang packageï¼ so it is a scala UnsupportedOperationException, And what's the difference? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130611417 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * //Overloaded accumulate method + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) +class ListView[T](val elementTypeInfo: TypeInformation[T]) extends DataView { --- End diff -- IMO, this will make ListView more like a state interface. Although the interface of ListView is same as ListState, we do not want the user to think that it is state. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130606340 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. + * + * All methods in this class are not implemented, users do not need to care about whether it is + * backed by Java ArrayList or state backend. It will be replaced by a {@link StateListView} or a + * {@link HeapListView}. + * + * + * NOTE: Users are not recommended to extends this class. + * + * + * Example: + * {{{ + * + * public class MyAccum { + *public ListView list; + *public long count; + * } + * + * public class MyAgg extends AggregateFunction { + * + * @Override + * public MyAccum createAccumulator() { + * MyAccum accum = new MyAccum(); + * accum.list = new ListView<>(Types.STRING); + * accum.count = 0L; + * return accum; + * } + * + * //Overloaded accumulate method + * public void accumulate(MyAccum accumulator, String id) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * } + * + * @Override + * public Long getValue(MyAccum accumulator) { + * accumulator.list.add(id); + * ... ... + * accumulator.get() + * ... ... + * return accumulator.count; + * } + * } + * + * }}} + * + * @param elementTypeInfo element type information + * @tparam T element type + */ +@TypeInfo(classOf[ListViewTypeInfoFactory[_]]) --- End diff -- I tried and it will compile with error "not found: type T" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130461621 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -405,6 +481,17 @@ class AggregationCodeGenerator( } } +val aggFuncCode = Seq( + genSetAggregationResults, + genAccumulate, + genRetract, + genCreateAccumulators, + genSetForwardedFields, + genSetConstantFlags, + genCreateOutputRow, + genMergeAccumulatorsPair, + genResetAccumulator).mkString("\n") --- End diff -- I think we need an additional `cleanUp()` method that clears all state objects for the current key. Otherwise, we will have memory leaks. The `cleanUp()` method must be called when the state retention timers trigger. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130458506 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -179,13 +214,19 @@ class AggregationCodeGenerator( | ${aggMapping(i)}, | (${accTypes(i)}) accs.getField($i));""".stripMargin } else { +val setDataView = if (accConfig.isDefined && accConfig.get.isUseState) { --- End diff -- This will create new `MapState` (or `ListState`) objects in every invocation of `setAggregationResults()`. I think we can make the state objects members of the `GeneratedAggregations` class and just set them into the accumulator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130459674 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -222,14 +271,22 @@ class AggregationCodeGenerator( j""" | public final void retract( |org.apache.flink.types.Row accs, - |org.apache.flink.types.Row input)""".stripMargin + |org.apache.flink.types.Row input) throws Exception """.stripMargin val retract: String = { -for (i <- aggs.indices) yield +for (i <- aggs.indices) yield { + val setDataView = if (accConfig.isDefined && accConfig.get.isUseState) { +genDataViewFieldSetter(s"acc$i", accConfig.get.accSpecs(i)) --- End diff -- Same as for `setAggregationResults()`. I think we should reuse the state objects. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130461181 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -386,13 +455,20 @@ class AggregationCodeGenerator( val sig: String = j""" | public final void resetAccumulator( - |org.apache.flink.types.Row accs)""".stripMargin + |org.apache.flink.types.Row accs) throws Exception """.stripMargin val reset: String = { -for (i <- aggs.indices) yield +for (i <- aggs.indices) yield { + val setDataView = if (accConfig.isDefined && accConfig.get.isUseState) { +genDataViewFieldSetter(s"acc$i", accConfig.get.accSpecs(i)) --- End diff -- Should reuse the state objects. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130459631 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -201,14 +242,22 @@ class AggregationCodeGenerator( j""" | public final void accumulate( |org.apache.flink.types.Row accs, - |org.apache.flink.types.Row input)""".stripMargin + |org.apache.flink.types.Row input) throws Exception """.stripMargin val accumulate: String = { -for (i <- aggs.indices) yield +for (i <- aggs.indices) yield { + val setDataView = if (accConfig.isDefined && accConfig.get.isUseState) { +genDataViewFieldSetter(s"acc$i", accConfig.get.accSpecs(i)) --- End diff -- Same as for `setAggregationResults()`. I think we should reuse the state objects. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130053061 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.dataview + +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation} +import org.apache.flink.table.dataview.ListViewTypeInfoFactory + +/** + * ListView encapsulates the operation of list. --- End diff -- ``` ListView provides List functionality for accumulators used by user-defined aggregate functions {{AggregateFunction}}. A ListView can be backed by a Java ArrayList or a state backend, depending on the context in which the function is used. At runtime `ListView` will be replaced by a {@link StateListView} or a {@link HeapListView}. Hence, the `ListView's` method do not need to be implemented. ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r130119984 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala --- @@ -19,13 +19,20 @@ package org.apache.flink.table.runtime.aggregate import org.apache.flink.api.common.functions.Function +import org.apache.flink.table.dataview.{DataViewFactory, HeapViewFactory} import org.apache.flink.types.Row /** * Base class for code-generated aggregations. */ abstract class GeneratedAggregations extends Function { + var factory: DataViewFactory = new HeapViewFactory() + + def getDataViewFactory: DataViewFactory = factory + + def setDataViewFactory(factory: DataViewFactory): Unit = this.factory = factory --- End diff -- I think we should rather add a method `initialize(ctx: RuntimeContext)` and generate code to register the state in this method. IMO, the `DataViewFactory` is also not required, because 1. we can code-gen all of that functionality 2. we can make heap the default for `MapView` and `ListView` such that we only need to replace it if it needs to be backed by state. So there would be only one implementation of `DataViewFactory` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---