This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 98437c765c8957fd5ea392502de393d4ec695ea2
Author: Timo Walther <twal...@apache.org>
AuthorDate: Fri Nov 30 17:05:58 2018 +0100

    [FLINK-10689] [table] Improve docs and fix bugs of ported classes
---
 .../flink-sql-client-test/pom.xml                  |   5 -
 .../flink/table/functions/AggregateFunction.java   | 102 ++++++++++++++-------
 .../flink/table/functions/FunctionContext.java     |  18 ++--
 .../flink/table/functions/ScalarFunction.java      |  37 ++++----
 .../flink/table/functions/TableFunction.java       |  81 ++++++++++------
 .../flink/table/functions/UserDefinedFunction.java |  28 +++---
 .../utils/userDefinedScalarFunctions.scala         |   1 -
 7 files changed, 170 insertions(+), 102 deletions(-)

diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml 
b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
index 6a0534c..d564668 100644
--- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml
+++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
@@ -37,11 +37,6 @@ under the License.
                        <version>${project.version}</version>
                        <scope>provided</scope>
                </dependency>
-               <dependency>
-                       <groupId>org.scala-lang</groupId>
-                       <artifactId>scala-compiler</artifactId>
-                       <scope>provided</scope>
-               </dependency>
 
                <!-- The following dependencies are for connector/format 
sql-jars that
                        we copy using the maven-dependency-plugin. When 
extending the test
diff --git 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
index 63a4d3f..70066a2 100644
--- 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
+++ 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
@@ -22,51 +22,89 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 /**
- * Base class for User-Defined Aggregates.
+ * Base class for user-defined aggregates.
  *
  * <p>The behavior of an {@link AggregateFunction} can be defined by 
implementing a series of custom
  * methods. An {@link AggregateFunction} needs at least three methods:
- *  - createAccumulator,
- *  - accumulate, and
- *  - getValue.
+ *  - <code>createAccumulator</code>,
+ *  - <code>accumulate</code>, and
+ *  - <code>getValue</code>.
  *
  * <p>There are a few other methods that can be optional to have:
- *  - retract,
- *  - merge, and
- *  - resetAccumulator
+ *  - <code>retract</code>,
+ *  - <code>merge</code>, and
+ *  - <code>resetAccumulator</code>.
  *
- * <p>All these methods must be declared publicly, not static and named 
exactly as the names
- * mentioned above. The methods createAccumulator and getValue are defined in 
the
- * {@link AggregateFunction} functions, while other methods are explained 
below.
+ * <p>All these methods must be declared publicly, not static, and named 
exactly as the names
+ * mentioned above. The methods {@link #createAccumulator()} and {@link 
#getValue} are defined in
+ * the {@link AggregateFunction} functions, while other methods are explained 
below.
  *
- * <p>Processes the input values and update the provided accumulator instance. 
The method
+ * <pre>
+ * {@code
+ * Processes the input values and update the provided accumulator instance. 
The method
  * accumulate can be overloaded with different custom types and arguments. An 
AggregateFunction
  * requires at least one accumulate() method.
  *
+ * param: accumulator           the accumulator which contains the current 
aggregated results
+ * param: [user defined inputs] the input value (usually obtained from a new 
arrived data).
  *
- * <p>Retracts the input values from the accumulator instance. The current 
design assumes the
+ * public void accumulate(ACC accumulator, [user defined inputs])
+ * }
+ * </pre>
+ *
+ * <pre>
+ * {@code
+ * Retracts the input values from the accumulator instance. The current design 
assumes the
  * inputs are the values that have been previously accumulated. The method 
retract can be
  * overloaded with different custom types and arguments. This function must be 
implemented for
- * datastream bounded over aggregate.
+ * data stream bounded OVER aggregates.
+ *
+ * param: accumulator           the accumulator which contains the current 
aggregated results
+ * param: [user defined inputs] the input value (usually obtained from a new 
arrived data).
+ *
+ * public void retract(ACC accumulator, [user defined inputs])
+ * }
+ * </pre>
+ *
+ * <pre>
+ * {@code
+ * Merges a group of accumulator instances into one accumulator instance. This 
function must be
+ * implemented for data stream session window grouping aggregates and data set 
grouping aggregates.
+ *
+ * param: accumulator the accumulator which will keep the merged aggregate 
results. It should
+ *                    be noted that the accumulator may contain the previous 
aggregated
+ *                    results. Therefore user should not replace or clean this 
instance in the
+ *                    custom merge method.
+ * param: its         an java.lang.Iterable pointed to a group of accumulators 
that will be
+ *                    merged.
+ *
+ * public void merge(ACC accumulator, java.lang.Iterable<ACC> iterable)
+ * }
+ * </pre>
  *
- * <p>Merges a group of accumulator instances into one accumulator instance. 
This function must be
- * implemented for datastream session window grouping aggregate and dataset 
grouping aggregate.
+ * <pre>
+ * {@code
+ * Resets the accumulator for this AggregateFunction. This function must be 
implemented for
+ * data set grouping aggregates.
  *
- * <p>Resets the accumulator for this {@link AggregateFunction}. This function 
must be implemented for
- * dataset grouping aggregate.
+ * param: accumulator the accumulator which needs to be reset
  *
+ * public void resetAccumulator(ACC accumulator)
+ * }
+ * </pre>
  *
- * @param T   the type of the aggregation result
- * @param ACC the type of the aggregation accumulator. The accumulator is used 
to keep the
- *             aggregated values which are needed to compute an aggregation 
result.
- *             AggregateFunction represents its state using accumulator, 
thereby the state of the
- *             AggregateFunction must be put into the accumulator.
+ * @param <T>   the type of the aggregation result
+ * @param <ACC> the type of the aggregation accumulator. The accumulator is 
used to keep the
+ *              aggregated values which are needed to compute an aggregation 
result.
+ *              AggregateFunction represents its state using accumulator, 
thereby the state of the
+ *              AggregateFunction must be put into the accumulator.
  */
 @PublicEvolving
 public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
 
        /**
-        * Creates and init the Accumulator for this {@link AggregateFunction}.
+        * Creates and initializes the accumulator for this {@link 
AggregateFunction}. The accumulator
+        * is used to keep the aggregated values which are needed to compute an 
aggregation result.
         *
         * @return the accumulator with the initial value
         */
@@ -85,29 +123,31 @@ public abstract class AggregateFunction<T, ACC> extends 
UserDefinedFunction {
        public abstract T getValue(ACC accumulator);
 
        /**
-        * Returns true if this AggregateFunction can only be applied in an 
OVER window.
+        * Returns <code>true</code> if this {@link AggregateFunction} can only 
be applied in an
+        * OVER window.
         *
-        * @return true if the AggregateFunction requires an OVER window, false 
otherwise.
+        * @return <code>true</code> if the {@link AggregateFunction} requires 
an OVER window,
+        *         <code>false</code> otherwise.
         */
        public boolean requiresOver() {
                return false;
        }
 
        /**
-        * Returns the TypeInformation of the AggregateFunction's result.
+        * Returns the {@link TypeInformation} of the {@link 
AggregateFunction}'s result.
         *
-        * @return The TypeInformation of the AggregateFunction's result or 
null if the result type
-        *         should be automatically inferred.
+        * @return The {@link TypeInformation} of the {@link 
AggregateFunction}'s result or
+        *         <code>null</code> if the result type should be automatically 
inferred.
         */
        public TypeInformation<T> getResultType() {
                return null;
        }
 
        /**
-        * Returns the TypeInformation of the AggregateFunction's accumulator.
+        * Returns the {@link TypeInformation} of the {@link 
AggregateFunction}'s accumulator.
         *
-        * @return The TypeInformation of the AggregateFunction's accumulator 
or null if the
-        *         accumulator type should be automatically inferred.
+        * @return The {@link TypeInformation} of the {@link 
AggregateFunction}'s accumulator or
+        *         <code>null</code> if the accumulator type should be 
automatically inferred.
         */
        public TypeInformation<ACC> getAccumulatorType() {
                return null;
diff --git 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
index 93b2229..ad3fbd5 100644
--- 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
+++ 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/FunctionContext.java
@@ -20,24 +20,28 @@ package org.apache.flink.table.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.metrics.MetricGroup;
 
 import java.io.File;
 
 /**
- * A FunctionContext allows to obtain global runtime information about the 
context in which the
- * user-defined function is executed. The information include the metric group,
- * the distributed cache files, and the global job parameters.
+ * A {@link FunctionContext} allows to obtain global runtime information about 
the context in which the
+ * user-defined function is executed.
+ *
+ * <p>The information includes the metric group, distributed cache files, and 
global job parameters.
  */
 @PublicEvolving
 public class FunctionContext {
 
-       /**
-        * @param context the runtime context in which the Flink Function is 
executed
-        */
        private RuntimeContext context;
 
+       /**
+        * Wraps the underlying {@link RuntimeContext}.
+        *
+        * @param context the runtime context in which Flink's {@link Function} 
is executed.
+        */
        public FunctionContext(RuntimeContext context) {
                this.context = context;
        }
@@ -70,7 +74,7 @@ public class FunctionContext {
         * @return (default) value associated with the given key
         */
        public String getJobParameter(String key, String defaultValue) {
-               GlobalJobParameters conf = 
context.getExecutionConfig().getGlobalJobParameters();
+               final GlobalJobParameters conf = 
context.getExecutionConfig().getGlobalJobParameters();
                if (conf != null && conf.toMap().containsKey(key)) {
                        return conf.toMap().get(key);
                } else {
diff --git 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java
 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java
index 091258e..9f7249c 100644
--- 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java
+++ 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/ScalarFunction.java
@@ -24,17 +24,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.table.api.ValidationException;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
 /**
  * Base class for a user-defined scalar function. A user-defined scalar 
functions maps zero, one,
  * or multiple scalar values to a new scalar value.
  *
  * <p>The behavior of a {@link ScalarFunction} can be defined by implementing 
a custom evaluation
- * method. An evaluation method must be declared publicly and named "eval". 
Evaluation methods
- * can also be overloaded by implementing multiple methods named "eval".
+ * method. An evaluation method must be declared publicly and named 
<code>eval</code>. Evaluation
+ * methods can also be overloaded by implementing multiple methods named 
<code>eval</code>.
  *
  * <p>User-defined functions must have a default constructor and must be 
instantiable during runtime.
  *
@@ -46,7 +42,8 @@ import java.util.stream.Collectors;
  * <p>Internally, the Table/SQL API code generation works with primitive 
values as much as possible.
  * If a user-defined scalar function should not introduce much overhead during 
runtime, it is
  * recommended to declare parameters and result types as primitive types 
instead of their boxed
- * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+ * classes. <code>DATE/TIME</code> is equal to <code>int</code>, 
<code>TIMESTAMP</code> is equal
+ * to <code>long</code>.
  */
 @PublicEvolving
 public abstract class ScalarFunction extends UserDefinedFunction {
@@ -60,7 +57,8 @@ public abstract class ScalarFunction extends 
UserDefinedFunction {
         * simple POJOs but might be wrong for more complex, custom, or 
composite types.
         *
         * @param signature signature of the method the return type needs to be 
determined
-        * @return {@link TypeInformation} of result type or null if Flink 
should determine the type
+        * @return {@link TypeInformation} of result type or <code>null</code> 
if Flink should
+        *         determine the type
         */
        public TypeInformation<?> getResultType(Class<?>[] signature) {
                return null;
@@ -70,26 +68,25 @@ public abstract class ScalarFunction extends 
UserDefinedFunction {
         * Returns {@link TypeInformation} about the operands of the evaluation 
method with a given
         * signature.
         *
-        * <p>In order to perform operand type inference in SQL (especially 
when NULL is used) it might be
-        * necessary to determine the parameter {@link TypeInformation} of an 
evaluation method.
-        * By default Flink's type extraction facilities are used for this but 
might be wrong for
-        * more complex, custom, or composite types.
+        * <p>In order to perform operand type inference in SQL (especially 
when <code>NULL</code> is
+        * used) it might be necessary to determine the parameter {@link 
TypeInformation} of an
+        * evaluation method. By default Flink's type extraction facilities are 
used for this but might
+        * be wrong for more complex, custom, or composite types.
         *
         * @param signature signature of the method the operand types need to 
be determined
         * @return {@link TypeInformation} of operand types
         */
        public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
-
-               List<TypeInformation<?>> typeList = 
Arrays.asList(signature).stream().map(c -> {
+               final TypeInformation<?>[] types = new 
TypeInformation<?>[signature.length];
+               for (int i = 0; i < signature.length; i++) {
                        try {
-                               return TypeExtractor.getForClass(c);
+                               types[i] = 
TypeExtractor.getForClass(signature[i]);
                        } catch (InvalidTypesException e) {
                                throw new ValidationException(
-                                               "Parameter types of table 
function " + this.getClass().getCanonicalName() + " cannot be " +
-                                               "automatically determined. 
Please provide type information manually.");
+                                       "Parameter types of scalar function " + 
this.getClass().getCanonicalName() +
+                                       " cannot be automatically determined. 
Please provide type information manually.");
                        }
-               }).collect(Collectors.toList());
-
-               return typeList.toArray(new TypeInformation<?>[0]);
+               }
+               return types;
        }
 }
diff --git 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java
 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java
index ae6b03a..03cd96f 100644
--- 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java
+++ 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java
@@ -25,17 +25,13 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.util.Collector;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
 /**
  * Base class for a user-defined table function (UDTF). A user-defined table 
functions works on
  * zero, one, or multiple scalar values as input and returns multiple rows as 
output.
  *
  * <p>The behavior of a {@link TableFunction} can be defined by implementing a 
custom evaluation
- * method. An evaluation method must be declared publicly, not static and 
named "eval".
- * Evaluation methods can also be overloaded by implementing multiple methods 
named "eval".
+ * method. An evaluation method must be declared publicly, not static, and 
named <code>eval</code>.
+ * Evaluation methods can also be overloaded by implementing multiple methods 
named <code>eval</code>.
  *
  * <p>User-defined functions must have a default constructor and must be 
instantiable during runtime.
  *
@@ -47,25 +43,49 @@ import java.util.stream.Collectors;
  * <p>Internally, the Table/SQL API code generation works with primitive 
values as much as possible.
  * If a user-defined table function should not introduce much overhead during 
runtime, it is
  * recommended to declare parameters and result types as primitive types 
instead of their boxed
- * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+ * classes. <code>DATE/TIME</code> is equal to <code>int</code>, 
<code>TIMESTAMP</code> is equal
+ * to <code>long</code>.
+ *
+ * <p>For Example:
+ *
+ * <pre>
+ * {@code
+ *   public class Split extends TableFunction<String> {
+ *
+ *     // implement an "eval" method with as many parameters as you want
+ *     public void eval(String str) {
+ *       for (String s : str.split(" ")) {
+ *         collect(s);   // use collect(...) to emit an output row
+ *       }
+ *     }
+ *
+ *     // you can overload the eval method here ...
+ *   }
+ *
+ *   TableEnvironment tEnv = ...
+ *   Table table = ...    // schema: ROW(a VARCHAR)
  *
+ *   // for Scala users
+ *   val split = new Split()
+ *   table.join(split('c) as ('s)).select('a, 's)
  *
- * @param T The type of the output row
+ *   // for Java users
+ *   tEnv.registerFunction("split", new Split());   // register table function 
first
+ *   table.join(new Table(tEnv, "split(a) as (s)")).select("a, s");
+ *
+ *   // for SQL users
+ *   tEnv.registerFunction("split", new Split());   // register table function 
first
+ *   tEnv.sqlQuery("SELECT a, s FROM MyTable, LATERAL TABLE(split(a)) as 
T(s)");
+ * }
+ * </pre>
+ *
+ * @param <T> The type of the output row
  */
 @PublicEvolving
 public abstract class TableFunction<T> extends UserDefinedFunction {
 
        /**
-        * Emit an output row.
-        *
-        * @param row the output row
-        */
-       protected void collect(T row) {
-               collector.collect(row);
-       }
-
-       /**
-        * The code generated collector used to emit row.
+        * The code generated collector used to emit rows.
         */
        protected Collector<T> collector;
 
@@ -84,7 +104,7 @@ public abstract class TableFunction<T> extends 
UserDefinedFunction {
         * method. Flink's type extraction facilities can handle basic types or
         * simple POJOs but might be wrong for more complex, custom, or 
composite types.
         *
-        * @return {@link TypeInformation} of result type or null if Flink 
should determine the type
+        * @return {@link TypeInformation} of result type or <code>null</code> 
if Flink should determine the type
         */
        public TypeInformation<T> getResultType() {
                return null;
@@ -103,18 +123,25 @@ public abstract class TableFunction<T> extends 
UserDefinedFunction {
         * @return {@link TypeInformation} of operand types
         */
        public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) {
-
-               List<TypeInformation<?>> typeList = 
Arrays.asList(signature).stream().map(c -> {
+               final TypeInformation<?>[] types = new 
TypeInformation<?>[signature.length];
+               for (int i = 0; i < signature.length; i++) {
                        try {
-                               return TypeExtractor.getForClass(c);
+                               types[i] = 
TypeExtractor.getForClass(signature[i]);
                        } catch (InvalidTypesException e) {
                                throw new ValidationException(
-                                               "Parameter types of table 
function " + this.getClass().getCanonicalName() + " cannot be " +
-                                               "automatically determined. 
Please provide type information manually.");
+                                       "Parameter types of table function " + 
this.getClass().getCanonicalName() +
+                                       " cannot be automatically determined. 
Please provide type information manually.");
                        }
-               }).collect(Collectors.toList());
-
-               return typeList.toArray(new TypeInformation<?>[0]);
+               }
+               return types;
        }
 
+       /**
+        * Emits an output row.
+        *
+        * @param row the output row
+        */
+       protected final void collect(T row) {
+               collector.collect(row);
+       }
 }
diff --git 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
index 6c91c10..0a4e600 100644
--- 
a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
+++ 
b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunction.java
@@ -29,12 +29,21 @@ import java.io.Serializable;
  */
 @PublicEvolving
 public abstract class UserDefinedFunction implements Serializable {
+
+       /**
+        * Returns a unique, serialized representation for this function.
+        */
+       public final String functionIdentifier() {
+               final String md5 = 
EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)));
+               return getClass().getCanonicalName().replace('.', 
'$').concat("$").concat(md5);
+       }
+
        /**
         * Setup method for user-defined function. It can be used for 
initialization work.
         * By default, this method does nothing.
         */
        public void open(FunctionContext context) throws Exception {
-
+               // do nothing
        }
 
        /**
@@ -42,24 +51,21 @@ public abstract class UserDefinedFunction implements 
Serializable {
         * By default, this method does nothing.
         */
        public void close() throws Exception {
-
+               // do nothing
        }
 
        /**
-        * @return true if and only if a call to this function is guaranteed to 
always return
-        * the same result given the same parameters; true is assumed by default
-        * if user's function is not pure functional, like random(), date(), 
now()...
-        * isDeterministic must return false
+        * Returns information about the determinism of the function's results.
+        *
+        * @return <code>true</code> if and only if a call to this function is 
guaranteed to
+        *         always return the same result given the same parameters. 
<code>true</code> is
+        *         assumed by default. If the function is not pure functional 
like
+        *         <code>random(), date(), now(), ...</code> this method must 
return <code>false</code>.
         */
        public boolean isDeterministic() {
                return true;
        }
 
-       public String functionIdentifier() {
-               String md5 = 
EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this)));
-               return getClass().getCanonicalName().replace('.', 
'$').concat("$").concat(md5);
-       }
-
        /**
         * Returns the name of the UDF that is used for plan explain and 
logging.
         */
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
index b70e582..29de7e0 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
@@ -29,7 +29,6 @@ import org.junit.Assert
 
 import scala.annotation.varargs
 import scala.collection.mutable
-import scala.collection.JavaConversions._
 import scala.io.Source
 
 case class SimplePojo(name: String, age: Int)

Reply via email to