This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 98437c765c8957fd5ea392502de393d4ec695ea2 Author: Timo Walther <twal...@apache.org> AuthorDate: Fri Nov 30 17:05:58 2018 +0100 [FLINK-10689] [table] Improve docs and fix bugs of ported classes --- .../flink-sql-client-test/pom.xml | 5 - .../flink/table/functions/AggregateFunction.java | 102 ++++++++++++++------- .../flink/table/functions/FunctionContext.java | 18 ++-- .../flink/table/functions/ScalarFunction.java | 37 ++++---- .../flink/table/functions/TableFunction.java | 81 ++++++++++------ .../flink/table/functions/UserDefinedFunction.java | 28 +++--- .../utils/userDefinedScalarFunctions.scala | 1 - 7 files changed, 170 insertions(+), 102 deletions(-) diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml index 6a0534c..d564668 100644 --- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml +++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml @@ -37,11 +37,6 @@ under the License. <version>${project.version}</version> <scope>provided</scope> </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-compiler</artifactId> - <scope>provided</scope> - </dependency> <!-- The following dependencies are for connector/format sql-jars that we copy using the maven-dependency-plugin. When extending the test diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java index 63a4d3f..70066a2 100644 --- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java +++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java @@ -22,51 +22,89 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.TypeInformation; /** - * Base class for User-Defined Aggregates. + * Base class for user-defined aggregates. * * <p>The behavior of an {@link AggregateFunction} can be defined by implementing a series of custom * methods. An {@link AggregateFunction} needs at least three methods: - * - createAccumulator, - * - accumulate, and - * - getValue. + * - <code>createAccumulator</code>, + * - <code>accumulate</code>, and + * - <code>getValue</code>. * * <p>There are a few other methods that can be optional to have: - * - retract, - * - merge, and - * - resetAccumulator + * - <code>retract</code>, + * - <code>merge</code>, and + * - <code>resetAccumulator</code>. * - * <p>All these methods must be declared publicly, not static and named exactly as the names - * mentioned above. The methods createAccumulator and getValue are defined in the - * {@link AggregateFunction} functions, while other methods are explained below. + * <p>All these methods must be declared publicly, not static, and named exactly as the names + * mentioned above. The methods {@link #createAccumulator()} and {@link #getValue} are defined in + * the {@link AggregateFunction} functions, while other methods are explained below. * - * <p>Processes the input values and update the provided accumulator instance. The method + * <pre> + * {@code + * Processes the input values and update the provided accumulator instance. The method * accumulate can be overloaded with different custom types and arguments. An AggregateFunction * requires at least one accumulate() method. * + * param: accumulator the accumulator which contains the current aggregated results + * param: [user defined inputs] the input value (usually obtained from a new arrived data). * - * <p>Retracts the input values from the accumulator instance. The current design assumes the + * public void accumulate(ACC accumulator, [user defined inputs]) + * } + * </pre> + * + * <pre> + * {@code + * Retracts the input values from the accumulator instance. The current design assumes the * inputs are the values that have been previously accumulated. The method retract can be * overloaded with different custom types and arguments. This function must be implemented for - * datastream bounded over aggregate. + * data stream bounded OVER aggregates. + * + * param: accumulator the accumulator which contains the current aggregated results + * param: [user defined inputs] the input value (usually obtained from a new arrived data). + * + * public void retract(ACC accumulator, [user defined inputs]) + * } + * </pre> + * + * <pre> + * {@code + * Merges a group of accumulator instances into one accumulator instance. This function must be + * implemented for data stream session window grouping aggregates and data set grouping aggregates. + * + * param: accumulator the accumulator which will keep the merged aggregate results. It should + * be noted that the accumulator may contain the previous aggregated + * results. Therefore user should not replace or clean this instance in the + * custom merge method. + * param: its an java.lang.Iterable pointed to a group of accumulators that will be + * merged. + * + * public void merge(ACC accumulator, java.lang.Iterable<ACC> iterable) + * } + * </pre> * - * <p>Merges a group of accumulator instances into one accumulator instance. This function must be - * implemented for datastream session window grouping aggregate and dataset grouping aggregate. + * <pre> + * {@code + * Resets the accumulator for this AggregateFunction. This function must be implemented for + * data set grouping aggregates. * - * <p>Resets the accumulator for this {@link AggregateFunction}. This function must be implemented for - * dataset grouping aggregate. + * param: accumulator the accumulator which needs to be reset * + * public void resetAccumulator(ACC accumulator) + * } + * </pre> * - * @param T the type of the aggregation result - * @param ACC the type of the aggregation accumulator. The accumulator is used to keep the - * aggregated values which are needed to compute an aggregation result. - * AggregateFunction represents its state using accumulator, thereby the state of the - * AggregateFunction must be put into the accumulator. + * @param <T> the type of the aggregation result + * @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the + * aggregated values which are needed to compute an aggregation result. + * AggregateFunction represents its state using accumulator, thereby the state of the + * AggregateFunction must be put into the accumulator. */ @PublicEvolving public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction { /** - * Creates and init the Accumulator for this {@link AggregateFunction}. + * Creates and initializes the accumulator for this {@link AggregateFunction}. The accumulator + * is used to keep the aggregated values which are needed to compute an aggregation result. * * @return the accumulator with the initial value */ @@ -85,29 +123,31 @@ public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction { public abstract T getValue(ACC accumulator); /** - * Returns true if this AggregateFunction can only be applied in an OVER window. + * Returns <code>true</code> if this {@link AggregateFunction} can only be applied in an + * OVER window. * - * @return true if the AggregateFunction requires an OVER window, false otherwise. + * @return <code>true</code> if the {@link AggregateFunction} requires an OVER window, + * <code>false</code> otherwise. */ public boolean requiresOver() { return false; } /** - * Returns the TypeInformation of the AggregateFunction's result. + * Returns the {@link TypeInformation} of the {@link AggregateFunction}'s result. * - * @return The TypeInformation of the AggregateFunction's result or null if the result type - * should be automatically inferred. + * @return The {@link TypeInformation} of the {@link AggregateFunction}'s result or + * <code>null</code> if the result type should be automatically inferred. */ public TypeInformation<T> getResultType() { return null; } /** - * Returns the TypeInformation of the AggregateFunction's accumulator. + * Returns the {@link TypeInformation} of the {@link AggregateFunction}'s accumulator. * - * @return The TypeInformation of the AggregateFunction's accumulator or null if the - * accumulator type should be automatically inferred. + * @return The {@link TypeInformation} of the {@link AggregateFunction}'s accumulator or + * <code>null</code> if the accumulator type should be automatically inferred. */ public TypeInformation<ACC> getAccumulatorType() { return null; diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java index 93b2229..ad3fbd5 100644 --- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java +++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java @@ -20,24 +20,28 @@ package org.apache.flink.table.functions; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; +import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.metrics.MetricGroup; import java.io.File; /** - * A FunctionContext allows to obtain global runtime information about the context in which the - * user-defined function is executed. The information include the metric group, - * the distributed cache files, and the global job parameters. + * A {@link FunctionContext} allows to obtain global runtime information about the context in which the + * user-defined function is executed. + * + * <p>The information includes the metric group, distributed cache files, and global job parameters. */ @PublicEvolving public class FunctionContext { - /** - * @param context the runtime context in which the Flink Function is executed - */ private RuntimeContext context; + /** + * Wraps the underlying {@link RuntimeContext}. + * + * @param context the runtime context in which Flink's {@link Function} is executed. + */ public FunctionContext(RuntimeContext context) { this.context = context; } @@ -70,7 +74,7 @@ public class FunctionContext { * @return (default) value associated with the given key */ public String getJobParameter(String key, String defaultValue) { - GlobalJobParameters conf = context.getExecutionConfig().getGlobalJobParameters(); + final GlobalJobParameters conf = context.getExecutionConfig().getGlobalJobParameters(); if (conf != null && conf.toMap().containsKey(key)) { return conf.toMap().get(key); } else { diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java index 091258e..9f7249c 100644 --- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java +++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java @@ -24,17 +24,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.table.api.ValidationException; -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; - /** * Base class for a user-defined scalar function. A user-defined scalar functions maps zero, one, * or multiple scalar values to a new scalar value. * * <p>The behavior of a {@link ScalarFunction} can be defined by implementing a custom evaluation - * method. An evaluation method must be declared publicly and named "eval". Evaluation methods - * can also be overloaded by implementing multiple methods named "eval". + * method. An evaluation method must be declared publicly and named <code>eval</code>. Evaluation + * methods can also be overloaded by implementing multiple methods named <code>eval</code>. * * <p>User-defined functions must have a default constructor and must be instantiable during runtime. * @@ -46,7 +42,8 @@ import java.util.stream.Collectors; * <p>Internally, the Table/SQL API code generation works with primitive values as much as possible. * If a user-defined scalar function should not introduce much overhead during runtime, it is * recommended to declare parameters and result types as primitive types instead of their boxed - * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long. + * classes. <code>DATE/TIME</code> is equal to <code>int</code>, <code>TIMESTAMP</code> is equal + * to <code>long</code>. */ @PublicEvolving public abstract class ScalarFunction extends UserDefinedFunction { @@ -60,7 +57,8 @@ public abstract class ScalarFunction extends UserDefinedFunction { * simple POJOs but might be wrong for more complex, custom, or composite types. * * @param signature signature of the method the return type needs to be determined - * @return {@link TypeInformation} of result type or null if Flink should determine the type + * @return {@link TypeInformation} of result type or <code>null</code> if Flink should + * determine the type */ public TypeInformation<?> getResultType(Class<?>[] signature) { return null; @@ -70,26 +68,25 @@ public abstract class ScalarFunction extends UserDefinedFunction { * Returns {@link TypeInformation} about the operands of the evaluation method with a given * signature. * - * <p>In order to perform operand type inference in SQL (especially when NULL is used) it might be - * necessary to determine the parameter {@link TypeInformation} of an evaluation method. - * By default Flink's type extraction facilities are used for this but might be wrong for - * more complex, custom, or composite types. + * <p>In order to perform operand type inference in SQL (especially when <code>NULL</code> is + * used) it might be necessary to determine the parameter {@link TypeInformation} of an + * evaluation method. By default Flink's type extraction facilities are used for this but might + * be wrong for more complex, custom, or composite types. * * @param signature signature of the method the operand types need to be determined * @return {@link TypeInformation} of operand types */ public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) { - - List<TypeInformation<?>> typeList = Arrays.asList(signature).stream().map(c -> { + final TypeInformation<?>[] types = new TypeInformation<?>[signature.length]; + for (int i = 0; i < signature.length; i++) { try { - return TypeExtractor.getForClass(c); + types[i] = TypeExtractor.getForClass(signature[i]); } catch (InvalidTypesException e) { throw new ValidationException( - "Parameter types of table function " + this.getClass().getCanonicalName() + " cannot be " + - "automatically determined. Please provide type information manually."); + "Parameter types of scalar function " + this.getClass().getCanonicalName() + + " cannot be automatically determined. Please provide type information manually."); } - }).collect(Collectors.toList()); - - return typeList.toArray(new TypeInformation<?>[0]); + } + return types; } } diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java index ae6b03a..03cd96f 100644 --- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java +++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java @@ -25,17 +25,13 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.table.api.ValidationException; import org.apache.flink.util.Collector; -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; - /** * Base class for a user-defined table function (UDTF). A user-defined table functions works on * zero, one, or multiple scalar values as input and returns multiple rows as output. * * <p>The behavior of a {@link TableFunction} can be defined by implementing a custom evaluation - * method. An evaluation method must be declared publicly, not static and named "eval". - * Evaluation methods can also be overloaded by implementing multiple methods named "eval". + * method. An evaluation method must be declared publicly, not static, and named <code>eval</code>. + * Evaluation methods can also be overloaded by implementing multiple methods named <code>eval</code>. * * <p>User-defined functions must have a default constructor and must be instantiable during runtime. * @@ -47,25 +43,49 @@ import java.util.stream.Collectors; * <p>Internally, the Table/SQL API code generation works with primitive values as much as possible. * If a user-defined table function should not introduce much overhead during runtime, it is * recommended to declare parameters and result types as primitive types instead of their boxed - * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long. + * classes. <code>DATE/TIME</code> is equal to <code>int</code>, <code>TIMESTAMP</code> is equal + * to <code>long</code>. + * + * <p>For Example: + * + * <pre> + * {@code + * public class Split extends TableFunction<String> { + * + * // implement an "eval" method with as many parameters as you want + * public void eval(String str) { + * for (String s : str.split(" ")) { + * collect(s); // use collect(...) to emit an output row + * } + * } + * + * // you can overload the eval method here ... + * } + * + * TableEnvironment tEnv = ... + * Table table = ... // schema: ROW(a VARCHAR) * + * // for Scala users + * val split = new Split() + * table.join(split('c) as ('s)).select('a, 's) * - * @param T The type of the output row + * // for Java users + * tEnv.registerFunction("split", new Split()); // register table function first + * table.join(new Table(tEnv, "split(a) as (s)")).select("a, s"); + * + * // for SQL users + * tEnv.registerFunction("split", new Split()); // register table function first + * tEnv.sqlQuery("SELECT a, s FROM MyTable, LATERAL TABLE(split(a)) as T(s)"); + * } + * </pre> + * + * @param <T> The type of the output row */ @PublicEvolving public abstract class TableFunction<T> extends UserDefinedFunction { /** - * Emit an output row. - * - * @param row the output row - */ - protected void collect(T row) { - collector.collect(row); - } - - /** - * The code generated collector used to emit row. + * The code generated collector used to emit rows. */ protected Collector<T> collector; @@ -84,7 +104,7 @@ public abstract class TableFunction<T> extends UserDefinedFunction { * method. Flink's type extraction facilities can handle basic types or * simple POJOs but might be wrong for more complex, custom, or composite types. * - * @return {@link TypeInformation} of result type or null if Flink should determine the type + * @return {@link TypeInformation} of result type or <code>null</code> if Flink should determine the type */ public TypeInformation<T> getResultType() { return null; @@ -103,18 +123,25 @@ public abstract class TableFunction<T> extends UserDefinedFunction { * @return {@link TypeInformation} of operand types */ public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) { - - List<TypeInformation<?>> typeList = Arrays.asList(signature).stream().map(c -> { + final TypeInformation<?>[] types = new TypeInformation<?>[signature.length]; + for (int i = 0; i < signature.length; i++) { try { - return TypeExtractor.getForClass(c); + types[i] = TypeExtractor.getForClass(signature[i]); } catch (InvalidTypesException e) { throw new ValidationException( - "Parameter types of table function " + this.getClass().getCanonicalName() + " cannot be " + - "automatically determined. Please provide type information manually."); + "Parameter types of table function " + this.getClass().getCanonicalName() + + " cannot be automatically determined. Please provide type information manually."); } - }).collect(Collectors.toList()); - - return typeList.toArray(new TypeInformation<?>[0]); + } + return types; } + /** + * Emits an output row. + * + * @param row the output row + */ + protected final void collect(T row) { + collector.collect(row); + } } diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java index 6c91c10..0a4e600 100644 --- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java +++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java @@ -29,12 +29,21 @@ import java.io.Serializable; */ @PublicEvolving public abstract class UserDefinedFunction implements Serializable { + + /** + * Returns a unique, serialized representation for this function. + */ + public final String functionIdentifier() { + final String md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this))); + return getClass().getCanonicalName().replace('.', '$').concat("$").concat(md5); + } + /** * Setup method for user-defined function. It can be used for initialization work. * By default, this method does nothing. */ public void open(FunctionContext context) throws Exception { - + // do nothing } /** @@ -42,24 +51,21 @@ public abstract class UserDefinedFunction implements Serializable { * By default, this method does nothing. */ public void close() throws Exception { - + // do nothing } /** - * @return true if and only if a call to this function is guaranteed to always return - * the same result given the same parameters; true is assumed by default - * if user's function is not pure functional, like random(), date(), now()... - * isDeterministic must return false + * Returns information about the determinism of the function's results. + * + * @return <code>true</code> if and only if a call to this function is guaranteed to + * always return the same result given the same parameters. <code>true</code> is + * assumed by default. If the function is not pure functional like + * <code>random(), date(), now(), ...</code> this method must return <code>false</code>. */ public boolean isDeterministic() { return true; } - public String functionIdentifier() { - String md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this))); - return getClass().getCanonicalName().replace('.', '$').concat("$").concat(md5); - } - /** * Returns the name of the UDF that is used for plan explain and logging. */ diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala index b70e582..29de7e0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala @@ -29,7 +29,6 @@ import org.junit.Assert import scala.annotation.varargs import scala.collection.mutable -import scala.collection.JavaConversions._ import scala.io.Source case class SimplePojo(name: String, age: Int)