twalthr commented on a change in pull request #13143:
URL: https://github.com/apache/flink/pull/13143#discussion_r470515620
##########
File path: docs/dev/table/functions/udfs.md
##########
@@ -1268,693 +1044,770 @@ class WeightedAvg extends AggregateFunction[JLong,
CountAccumulator] {
}
}
- def resetAccumulator(acc: WeightedAvgAccum): Unit = {
+ def resetAccumulator(acc: WeightedAvgAccumulator): Unit = {
acc.count = 0
acc.sum = 0L
}
+}
- override def getAccumulatorType: TypeInformation[WeightedAvgAccum] = {
- new TupleTypeInfo(classOf[WeightedAvgAccum], Types.LONG, Types.INT)
- }
+val env = TableEnvironment.create(...)
- override def getResultType: TypeInformation[JLong] = Types.LONG
-}
+// call function "inline" without registration in Table API
+env
+ .from("MyTable")
+ .groupBy($"myField")
+ .select($"myField", call(classOf[WeightedAvg], $"value", $"weight"))
// register function
-val tEnv: StreamTableEnvironment = ???
-tEnv.registerFunction("wAvg", new WeightedAvg())
+env.createTemporarySystemFunction("WeightedAvg", classOf[WeightedAvg])
-// use function
-tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores
GROUP BY user")
+// call registered function in Table API
+env
+ .from("MyTable")
+ .groupBy($"myField")
+ .select($"myField", call("WeightedAvg", $"value", $"weight"))
+// call registered function in SQL
+env.sqlQuery(
+ "SELECT myField, WeightedAvg(value, weight) FROM MyTable GROUP BY myField"
+)
{% endhighlight %}
</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-'''
-Java code:
-
-/**
- * Accumulator for WeightedAvg.
- */
-public static class WeightedAvgAccum {
- public long sum = 0;
- public int count = 0;
-}
-
-// The java class must have a public no-argument constructor and can be
founded in current java classloader.
-
-/**
- * Weighted Average user-defined aggregate function.
- */
-public static class WeightedAvg extends AggregateFunction<Long,
WeightedAvgAccum> {
-
- @Override
- public WeightedAvgAccum createAccumulator() {
- return new WeightedAvgAccum();
- }
-
- @Override
- public Long getValue(WeightedAvgAccum acc) {
- if (acc.count == 0) {
- return null;
- } else {
- return acc.sum / acc.count;
- }
- }
-
- public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
- acc.sum += iValue * iWeight;
- acc.count += iWeight;
- }
-
- public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
- acc.sum -= iValue * iWeight;
- acc.count -= iWeight;
- }
-
- public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
- Iterator<WeightedAvgAccum> iter = it.iterator();
- while (iter.hasNext()) {
- WeightedAvgAccum a = iter.next();
- acc.count += a.count;
- acc.sum += a.sum;
- }
- }
-
- public void resetAccumulator(WeightedAvgAccum acc) {
- acc.count = 0;
- acc.sum = 0L;
- }
-}
-'''
-
-# register function
-t_env = ... # type: StreamTableEnvironment
-t_env.register_java_function("wAvg", "my.java.function.WeightedAvg")
+</div>
-# use function
-t_env.sql_query("SELECT user, wAvg(points, level) AS avgPoints FROM userScores
GROUP BY user")
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs.
The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted
average value, the accumulator
+needs to store the weighted sum and count of all the data that has been
accumulated. In our example, we
+define a class `WeightedAvgAccumulator` to be the accumulator. Accumulators
are automatically managed
+by Flink's checkpointing mechanism and are restored in case of a failure to
ensure exactly-once semantics.
-{% endhighlight %}
-</div>
-</div>
+### Mandatory and Optional Methods
-{% top %}
+**The following methods are mandatory for each `AggregateFunction`:**
-Table Aggregation Functions
----------------------
+- `createAccumulator()`
+- `accumulate(...)`
+- `getValue(...)`
+
+Additionally, there are a few methods that can be optionally implemented.
While some of these methods
+allow the system more efficient query execution, others are mandatory for
certain use cases. For instance,
+the `merge(...)` method is mandatory if the aggregation function should be
applied in the context of a
+session group window (the accumulators of two session windows need to be
joined when a row is observed
+that "connects" them).
-User-Defined Table Aggregate Functions (UDTAGGs) aggregate a table (one or
more rows with one or more attributes) to a result table with multi rows and
columns.
+**The following methods of `AggregateFunction` are required depending on the
use case:**
-<center>
-<img alt="UDAGG mechanism" src="{{ site.baseurl }}/fig/udtagg-mechanism.png"
width="80%">
-</center>
+- `retract(...)` is required for aggregations on `OVER` windows.
+- `merge(...)` is required for many bounded aggregations and session window
aggregations.
-The above figure shows an example of a table aggregation. Assume you have a
table that contains data about beverages. The table consists of three columns,
`id`, `name` and `price` and 5 rows. Imagine you need to find the top 2 highest
prices of all beverages in the table, i.e., perform a `top2()` table
aggregation. You would need to check each of the 5 rows and the result would be
a table with the top 2 values.
+If the aggregate function can only be applied in an OVER window, this can be
declared by returning the
+requirement `FunctionRequirement.OVER_WINDOW_ONLY` in `getRequirements()`.
-User-defined table aggregation functions are implemented by extending the
`TableAggregateFunction` class. A `TableAggregateFunction` works as follows.
First, it needs an `accumulator`, which is the data structure that holds the
intermediate result of the aggregation. An empty accumulator is created by
calling the `createAccumulator()` method of the `TableAggregateFunction`.
Subsequently, the `accumulate()` method of the function is called for each
input row to update the accumulator. Once all rows have been processed, the
`emitValue()` method of the function is called to compute and return the final
results.
+If an accumulator needs to store large amounts of data,
`org.apache.flink.table.api.dataview.ListView`
+and `org.apache.flink.table.api.dataview.MapView` provide advanced features
for leveraging Flink's state
+backends in unbounded data scenarios. Please see the docs of the corresponding
classes for more information
+about this advanced feature.
-**The following methods are mandatory for each `TableAggregateFunction`:**
+Since some of methods are optional or can be overloaded, the methods are
called by generated code. The
+base class does not always provide a signature to be overridden by the
concrete implementation class. Nevertheless,
+all mentioned methods must be declared publicly, not static, and named exactly
as the names mentioned above
+to be called.
-- `createAccumulator()`
-- `accumulate()`
+Detailed documentation for all methods that are not declared in
`AggregateFunction` and called by generated
+code is given below.
-Flinkās type extraction facilities can fail to identify complex data types,
e.g., if they are not basic types or simple POJOs. So similar to
`ScalarFunction` and `TableFunction`, `TableAggregateFunction` provides methods
to specify the `TypeInformation` of the result type (through
- `TableAggregateFunction#getResultType()`) and the type of the accumulator
(through `TableAggregateFunction#getAccumulatorType()`).
-
-Besides the above methods, there are a few contracted methods that can be
-optionally implemented. While some of these methods allow the system more
efficient query execution, others are mandatory for certain use cases. For
instance, the `merge()` method is mandatory if the aggregation function should
be applied in the context of a session group window (the accumulators of two
session windows need to be joined when a row is observed that "connects" them).
+**`accumulate(...)`**
+<div class="codetabs" markdown="1">
-**The following methods of `TableAggregateFunction` are required depending on
the use case:**
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/*
+ * Processes the input values and updates the provided accumulator instance.
The method
+ * accumulate can be overloaded with different custom types and arguments. An
aggregate function
+ * requires at least one accumulate() method.
+ *
+ * param: accumulator the accumulator which contains the current
aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new
arrived data).
+ */
+public void accumulate(ACC accumulator, [user defined inputs])
+{% endhighlight %}
+</div>
-- `retract()` is required for aggregations on bounded `OVER` windows.
-- `merge()` is required for many batch aggregations and session window
aggregations.
-- `resetAccumulator()` is required for many batch aggregations.
-- `emitValue()` is required for batch and window aggregations.
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/*
+ * Processes the input values and updates the provided accumulator instance.
The method
+ * accumulate can be overloaded with different custom types and arguments. An
aggregate function
+ * requires at least one accumulate() method.
+ *
+ * param: accumulator the accumulator which contains the current
aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new
arrived data).
+ */
+def accumulate(accumulator: ACC, [user defined inputs]): Unit
+{% endhighlight %}
+</div>
-**The following methods of `TableAggregateFunction` are used to improve the
performance of streaming jobs:**
+</div>
-- `emitUpdateWithRetract()` is used to emit values that have been updated
under retract mode.
+**`retract(...)`**
+<div class="codetabs" markdown="1">
-For `emitValue` method, it emits full data according to the accumulator. Take
TopN as an example, `emitValue` emit all top n values each time. This may bring
performance problems for streaming jobs. To improve the performance, a user can
also implement `emitUpdateWithRetract` method to improve the performance. The
method outputs data incrementally in retract mode, i.e., once there is an
update, we have to retract old records before sending new updated ones. The
method will be used in preference to the `emitValue` method if they are all
defined in the table aggregate function, because `emitUpdateWithRetract` is
treated to be more efficient than `emitValue` as it can output values
incrementally.
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/*
+ * Retracts the input values from the accumulator instance. The current design
assumes the
+ * inputs are the values that have been previously accumulated. The method
retract can be
+ * overloaded with different custom types and arguments. This method must be
implemented for
+ * bounded OVER aggregates over unbounded tables.
+ *
+ * param: accumulator the accumulator which contains the current
aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new
arrived data).
+ */
+public void retract(ACC accumulator, [user defined inputs])
+{% endhighlight %}
+</div>
-All methods of `TableAggregateFunction` must be declared as `public`, not
`static` and named exactly as the names mentioned above. The methods
`createAccumulator`, `getResultType`, and `getAccumulatorType` are defined in
the parent abstract class of `TableAggregateFunction`, while others are
contracted methods. In order to define a table aggregate function, one has to
extend the base class `org.apache.flink.table.functions.TableAggregateFunction`
and implement one (or more) `accumulate` methods. The method `accumulate` can
be overloaded with different parameter types and supports variable arguments.
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/*
+ * Retracts the input values from the accumulator instance. The current design
assumes the
+ * inputs are the values that have been previously accumulated. The method
retract can be
+ * overloaded with different custom types and arguments. This method must be
implemented for
+ * bounded OVER aggregates over unbounded tables.
+ *
+ * param: accumulator the accumulator which contains the current
aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new
arrived data).
+ */
+def retract(accumulator: ACC, [user defined inputs]): Unit
+{% endhighlight %}
+</div>
-Detailed documentation for all methods of `TableAggregateFunction` is given
below.
+</div>
+**`merge(...)`**
<div class="codetabs" markdown="1">
+
<div data-lang="java" markdown="1">
{% highlight java %}
-
-/**
- * Base class for user-defined aggregates and table aggregates.
- *
- * @param <T> the type of the aggregation result.
- * @param <ACC> the type of the aggregation accumulator. The accumulator is
used to keep the
- * aggregated values which are needed to compute an aggregation
result.
- */
-public abstract class UserDefinedAggregateFunction<T, ACC> extends
UserDefinedFunction {
-
- /**
- * Creates and init the Accumulator for this (table)aggregate function.
- *
- * @return the accumulator with the initial value
- */
- public ACC createAccumulator(); // MANDATORY
-
- /**
- * Returns the TypeInformation of the (table)aggregate function's result.
- *
- * @return The TypeInformation of the (table)aggregate function's result or
null if the result
- * type should be automatically inferred.
- */
- public TypeInformation<T> getResultType = null; // PRE-DEFINED
-
- /**
- * Returns the TypeInformation of the (table)aggregate function's
accumulator.
- *
- * @return The TypeInformation of the (table)aggregate function's
accumulator or null if the
- * accumulator type should be automatically inferred.
- */
- public TypeInformation<ACC> getAccumulatorType = null; // PRE-DEFINED
-}
-
-/**
- * Base class for table aggregation functions.
- *
- * @param <T> the type of the aggregation result
- * @param <ACC> the type of the aggregation accumulator. The accumulator is
used to keep the
- * aggregated values which are needed to compute a table
aggregation result.
- * TableAggregateFunction represents its state using accumulator,
thereby the state of
- * the TableAggregateFunction must be put into the accumulator.
- */
-public abstract class TableAggregateFunction<T, ACC> extends
UserDefinedAggregateFunction<T, ACC> {
-
- /** Processes the input values and update the provided accumulator instance.
The method
- * accumulate can be overloaded with different custom types and arguments.
A TableAggregateFunction
- * requires at least one accumulate() method.
- *
- * @param accumulator the accumulator which contains the current
aggregated results
- * @param [user defined inputs] the input value (usually obtained from a
new arrived data).
- */
- public void accumulate(ACC accumulator, [user defined inputs]); // MANDATORY
-
- /**
- * Retracts the input values from the accumulator instance. The current
design assumes the
- * inputs are the values that have been previously accumulated. The method
retract can be
- * overloaded with different custom types and arguments. This function must
be implemented for
- * datastream bounded over aggregate.
- *
- * @param accumulator the accumulator which contains the current
aggregated results
- * @param [user defined inputs] the input value (usually obtained from a
new arrived data).
- */
- public void retract(ACC accumulator, [user defined inputs]); // OPTIONAL
-
- /**
- * Merges a group of accumulator instances into one accumulator instance.
This function must be
- * implemented for datastream session window grouping aggregate and dataset
grouping aggregate.
- *
- * @param accumulator the accumulator which will keep the merged aggregate
results. It should
- * be noted that the accumulator may contain the
previous aggregated
- * results. Therefore user should not replace or clean
this instance in the
- * custom merge method.
- * @param its an {@link java.lang.Iterable} pointed to a group of
accumulators that will be
- * merged.
- */
- public void merge(ACC accumulator, java.lang.Iterable<ACC> its); // OPTIONAL
-
- /**
- * Called every time when an aggregation result should be materialized. The
returned value
- * could be either an early and incomplete result (periodically emitted as
data arrive) or
- * the final result of the aggregation.
- *
- * @param accumulator the accumulator which contains the current
- * aggregated results
- * @param out the collector used to output data
- */
- public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL
-
- /**
- * Called every time when an aggregation result should be materialized. The
returned value
- * could be either an early and incomplete result (periodically emitted as
data arrive) or
- * the final result of the aggregation.
- *
- * Different from emitValue, emitUpdateWithRetract is used to emit values
that have been updated.
- * This method outputs data incrementally in retract mode, i.e., once there
is an update, we
- * have to retract old records before sending new updated ones. The
emitUpdateWithRetract
- * method will be used in preference to the emitValue method if both
methods are defined in the
- * table aggregate function, because the method is treated to be more
efficient than emitValue
- * as it can outputvalues incrementally.
- *
- * @param accumulator the accumulator which contains the current
- * aggregated results
- * @param out the retractable collector used to output data. Use
collect method
- * to output(add) records and use retract method to
retract(delete)
- * records.
- */
- public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T>
out); // OPTIONAL
-
- /**
- * Collects a record and forwards it. The collector can output retract
messages with the retract
- * method. Note: only use it in {@code emitRetractValueIncrementally}.
- */
- public interface RetractableCollector<T> extends Collector<T> {
-
- /**
- * Retract a record.
- *
- * @param record The record to retract.
- */
- void retract(T record);
- }
-}
+/*
+ * Merges a group of accumulator instances into one accumulator instance. This
method must be
+ * implemented for unbounded session window grouping aggregates and bounded
grouping aggregates.
+ *
+ * param: accumulator the accumulator which will keep the merged aggregate
results. It should
+ * be noted that the accumulator may contain the previous
aggregated
+ * results. Therefore user should not replace or clean this
instance in the
+ * custom merge method.
+ * param: iterable an java.lang.Iterable pointed to a group of accumulators
that will be
+ * merged.
+ */
+public void merge(ACC accumulator, java.lang.Iterable<ACC> iterable)
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-/**
- * Base class for user-defined aggregates and table aggregates.
- *
- * @tparam T the type of the aggregation result.
- * @tparam ACC the type of the aggregation accumulator. The accumulator is
used to keep the
- * aggregated values which are needed to compute an aggregation
result.
- */
-abstract class UserDefinedAggregateFunction[T, ACC] extends
UserDefinedFunction {
-
- /**
- * Creates and init the Accumulator for this (table)aggregate function.
- *
- * @return the accumulator with the initial value
- */
- def createAccumulator(): ACC // MANDATORY
-
- /**
- * Returns the TypeInformation of the (table)aggregate function's result.
- *
- * @return The TypeInformation of the (table)aggregate function's result or
null if the result
- * type should be automatically inferred.
- */
- def getResultType: TypeInformation[T] = null // PRE-DEFINED
-
- /**
- * Returns the TypeInformation of the (table)aggregate function's
accumulator.
- *
- * @return The TypeInformation of the (table)aggregate function's
accumulator or null if the
- * accumulator type should be automatically inferred.
- */
- def getAccumulatorType: TypeInformation[ACC] = null // PRE-DEFINED
-}
-
-/**
- * Base class for table aggregation functions.
- *
- * @tparam T the type of the aggregation result
- * @tparam ACC the type of the aggregation accumulator. The accumulator is
used to keep the
- * aggregated values which are needed to compute an aggregation
result.
- * TableAggregateFunction represents its state using accumulator,
thereby the state of
- * the TableAggregateFunction must be put into the accumulator.
- */
-abstract class TableAggregateFunction[T, ACC] extends
UserDefinedAggregateFunction[T, ACC] {
-
- /**
- * Processes the input values and update the provided accumulator instance.
The method
- * accumulate can be overloaded with different custom types and arguments.
A TableAggregateFunction
- * requires at least one accumulate() method.
- *
- * @param accumulator the accumulator which contains the current
aggregated results
- * @param [user defined inputs] the input value (usually obtained from a
new arrived data).
- */
- def accumulate(accumulator: ACC, [user defined inputs]): Unit // MANDATORY
-
- /**
- * Retracts the input values from the accumulator instance. The current
design assumes the
- * inputs are the values that have been previously accumulated. The method
retract can be
- * overloaded with different custom types and arguments. This function must
be implemented for
- * datastream bounded over aggregate.
- *
- * @param accumulator the accumulator which contains the current
aggregated results
- * @param [user defined inputs] the input value (usually obtained from a
new arrived data).
- */
- def retract(accumulator: ACC, [user defined inputs]): Unit // OPTIONAL
-
- /**
- * Merges a group of accumulator instances into one accumulator instance.
This function must be
- * implemented for datastream session window grouping aggregate and dataset
grouping aggregate.
- *
- * @param accumulator the accumulator which will keep the merged aggregate
results. It should
- * be noted that the accumulator may contain the
previous aggregated
- * results. Therefore user should not replace or clean
this instance in the
- * custom merge method.
- * @param its an [[java.lang.Iterable]] pointed to a group of
accumulators that will be
- * merged.
- */
- def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit // OPTIONAL
-
- /**
- * Called every time when an aggregation result should be materialized. The
returned value
- * could be either an early and incomplete result (periodically emitted as
data arrive) or
- * the final result of the aggregation.
- *
- * @param accumulator the accumulator which contains the current
- * aggregated results
- * @param out the collector used to output data
- */
- def emitValue(accumulator: ACC, out: Collector[T]): Unit // OPTIONAL
-
- /**
- * Called every time when an aggregation result should be materialized. The
returned value
- * could be either an early and incomplete result (periodically emitted as
data arrive) or
- * the final result of the aggregation.
- *
- * Different from emitValue, emitUpdateWithRetract is used to emit values
that have been updated.
- * This method outputs data incrementally in retract mode, i.e., once there
is an update, we
- * have to retract old records before sending new updated ones. The
emitUpdateWithRetract
- * method will be used in preference to the emitValue method if both
methods are defined in the
- * table aggregate function, because the method is treated to be more
efficient than emitValue
- * as it can outputvalues incrementally.
- *
- * @param accumulator the accumulator which contains the current
- * aggregated results
- * @param out the retractable collector used to output data. Use
collect method
- * to output(add) records and use retract method to
retract(delete)
- * records.
- */
- def emitUpdateWithRetract(accumulator: ACC, out: RetractableCollector[T]):
Unit // OPTIONAL
-
- /**
- * Collects a record and forwards it. The collector can output retract
messages with the retract
- * method. Note: only use it in `emitRetractValueIncrementally`.
- */
- trait RetractableCollector[T] extends Collector[T] {
-
- /**
- * Retract a record.
- *
- * @param record The record to retract.
- */
- def retract(record: T): Unit
- }
-}
+/*
+ * Merges a group of accumulator instances into one accumulator instance. This
method must be
+ * implemented for unbounded session window grouping aggregates and bounded
grouping aggregates.
+ *
+ * param: accumulator the accumulator which will keep the merged aggregate
results. It should
+ * be noted that the accumulator may contain the previous
aggregated
+ * results. Therefore user should not replace or clean this
instance in the
+ * custom merge method.
+ * param: iterable an java.lang.Iterable pointed to a group of accumulators
that will be
+ * merged.
+ */
+def merge(accumulator: ACC, iterable: java.lang.Iterable[ACC]): Unit
{% endhighlight %}
</div>
+
</div>
+{% top %}
+
+Table Aggregate Functions
+-------------------------
+
+A user-defined table aggregate function (_UDTAGG_) maps scalar values of
multiple rows to zero, one,
+or multiple rows. The returned record may consist of one or more fields. If an
output row consists of
+only one field, the row can be omitted and a scalar value can be emitted. It
will be wrapped into an
+implicit row by the runtime.
Review comment:
I will replace `and` with `that` and fix the `an`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]