[FLINK-7182] Activate checkstyle flink-java/functions This closes #4333.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec1044d9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec1044d9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec1044d9 Branch: refs/heads/master Commit: ec1044d985f24b9c1e45d1d25ef060a504e1695d Parents: 137463c Author: Dawid Wysakowicz <dwysakow...@apache.org> Authored: Fri Jul 14 10:28:14 2017 +0200 Committer: zentol <ches...@apache.org> Committed: Tue Jul 18 16:15:00 2017 +0200 ---------------------------------------------------------------------- .../flink/api/java/functions/FirstReducer.java | 12 +- .../api/java/functions/FlatMapIterator.java | 18 +- .../api/java/functions/FormattingMapper.java | 4 + .../api/java/functions/FunctionAnnotation.java | 108 ++++++----- .../api/java/functions/GroupReduceIterator.java | 17 +- .../flink/api/java/functions/IdPartitioner.java | 3 + .../api/java/functions/SampleInCoordinator.java | 3 +- .../api/java/functions/SampleInPartition.java | 4 +- .../api/java/functions/SampleWithFraction.java | 1 + .../api/java/functions/SelectByMaxFunction.java | 17 +- .../api/java/functions/SelectByMinFunction.java | 19 +- .../api/java/functions/SemanticPropUtil.java | 188 +++++++++---------- .../api/java/functions/ClosureCleanerTest.java | 6 +- .../java/functions/SelectByFunctionsTest.java | 69 +++---- .../java/functions/SemanticPropUtilTest.java | 60 +++--- .../SemanticPropertiesPrecedenceTest.java | 17 +- .../SemanticPropertiesProjectionTest.java | 24 ++- .../SemanticPropertiesTranslationTest.java | 118 ++++++------ 18 files changed, 363 insertions(+), 325 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java index 2063a12..db2bf43 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java @@ -23,6 +23,10 @@ import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.util.Collector; +/** + * Reducer that only emits the first N elements in a group. + * @param <T> + */ @Internal public class FirstReducer<T> implements GroupReduceFunction<T, T>, GroupCombineFunction<T, T> { private static final long serialVersionUID = 1L; @@ -37,16 +41,16 @@ public class FirstReducer<T> implements GroupReduceFunction<T, T>, GroupCombineF public void reduce(Iterable<T> values, Collector<T> out) throws Exception { int emitCnt = 0; - for(T val : values) { + for (T val : values) { out.collect(val); - + emitCnt++; - if(emitCnt == count) { + if (emitCnt == count) { break; } } } - + @Override public void combine(Iterable<T> values, Collector<T> out) throws Exception { reduce(values, out); http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java index 8f47d8f..be4daef 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java @@ -18,18 +18,18 @@ package org.apache.flink.api.java.functions; -import java.util.Iterator; - import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.util.Collector; +import java.util.Iterator; + /** * A convenience variant of the {@link org.apache.flink.api.common.functions.RichFlatMapFunction} that returns elements through an iterator, rather then * through a collector. In all other respects, it behaves exactly like the FlatMapFunction. - * <p> - * The function needs to be serializable, as defined in {@link java.io.Serializable}. - * + * + * <p>The function needs to be serializable, as defined in {@link java.io.Serializable}. + * * @param <IN> Type of the input elements. * @param <OUT> Type of the returned elements. */ @@ -41,17 +41,17 @@ public abstract class FlatMapIterator<IN, OUT> extends RichFlatMapFunction<IN, O /** * The core method of the function. Takes an element from the input data set and transforms * it into zero, one, or more elements. - * + * * @param value The input value. * @return An iterator over the returned elements. - * + * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ public abstract Iterator<OUT> flatMap(IN value) throws Exception; - + // -------------------------------------------------------------------------------------------- - + /** * Delegates calls to the {@link #flatMap(Object)} method. */ http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/FormattingMapper.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FormattingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FormattingMapper.java index 95ae4a2..5d01018 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FormattingMapper.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FormattingMapper.java @@ -23,6 +23,10 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.io.TextOutputFormat; import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter; +/** + * Mapper that converts values to strings using a {@link TextFormatter}. + * @param <T> + */ @Internal public class FormattingMapper<T> implements MapFunction<T, String> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java index f01d9d8..93746fb 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java @@ -47,11 +47,10 @@ import java.util.Set; * } * }</pre> * - * <p> - * All annotations take Strings with expressions that refer to (nested) value fields of the input and output types of a function. + * + * <p>All annotations take Strings with expressions that refer to (nested) value fields of the input and output types of a function. * Field expressions for of composite data types (tuples, POJOs, Scala case classes) can be expressed in * different ways, depending on the data type they refer to. - *</p> * * <ul> * <li>Java tuple data types (such as {@link org.apache.flink.api.java.tuple.Tuple3}): A tuple field can be addressed using @@ -65,14 +64,13 @@ import java.util.Set; * of a case class that describes a 2d-coordinate.</li> * </ul> * - * <p> - * Nested fields are addressed by navigation, e.g., <code>"f1.xValue"</code> addresses the field <code>xValue</code> of a POJO type, + * + * <p>Nested fields are addressed by navigation, e.g., <code>"f1.xValue"</code> addresses the field <code>xValue</code> of a POJO type, * that is stored at the second field of a Java tuple. In order to refer to all fields of a composite type (or the composite type itself) * such as a tuple, POJO, or case class type, a <code>"*"</code> wildcard can be used, e.g., <code>f2.*</code> or <code>f2</code> reference all fields * of a composite type at the third position of a Java tuple. - * </p> * - * <b>NOTE: The use of semantic annotation is optional! + * <p><b>NOTE: The use of semantic annotation is optional! * If used correctly, semantic annotations can help the Flink optimizer to generate more efficient execution plans. * However, incorrect semantic annotations can cause the optimizer to generate incorrect execution plans which compute wrong results! * So be careful when adding semantic annotations. @@ -86,27 +84,27 @@ public class FunctionAnnotation { * The ForwardedFields annotation declares fields which are never modified by the annotated function and * which are forwarded at the same position to the output or unchanged copied to another position in the output. * - * Fields that are forwarded at the same position can be specified by their position. + * <p>Fields that are forwarded at the same position can be specified by their position. * The specified position must be valid for the input and output data type and have the same type. * For example {@code {@literal @}ForwardedFields({"f2"})} declares that the third field of a Java input tuple is * copied to the third field of an output tuple. * - * Fields which are unchanged copied to another position in the output are declared by specifying the + * <p>Fields which are unchanged copied to another position in the output are declared by specifying the * source field expression in the input and the target field expression in the output. * {@code {@literal @}ForwardedFields({"f0->f2"})} denotes that the first field of the Java input tuple is * unchanged copied to the third field of the Java output tuple. When using the wildcard ("*") ensure that * the number of declared fields and their types in input and output type match. * - * Multiple forwarded fields can be annotated in one ({@code {@literal @}ForwardedFields({"f2; f3->f0; f4"})}) + * <p>Multiple forwarded fields can be annotated in one ({@code {@literal @}ForwardedFields({"f2; f3->f0; f4"})}) * or separate Strings ({@code {@literal @}ForwardedFields({"f2", "f3->f0", "f4"})}). * - * <b>NOTE: The use of the ForwardedFields annotation is optional. + * <p><b>NOTE: The use of the ForwardedFields annotation is optional. * If used correctly, it can help the Flink optimizer to generate more efficient execution plans. * However if used incorrectly, it can cause invalid plan choices and the computation of wrong results! * It is NOT required that all forwarded fields are declared, but all declarations must be correct. * </b> * - * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * <p>Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for * details on field expressions such as nested fields and wildcard. * */ @@ -121,30 +119,30 @@ public class FunctionAnnotation { * never modified by the annotated function and which are forwarded at the same position to the * output or unchanged copied to another position in the output. * - * Fields that are forwarded from the first input at the same position in the output can be + * <p>Fields that are forwarded from the first input at the same position in the output can be * specified by their position. The specified position must be valid for the input and output data type and have the same type. * For example {@code {@literal @}ForwardedFieldsFirst({"f2"})} declares that the third field of a Java input tuple at the first input is * copied to the third field of an output tuple. * - * Fields which are unchanged copied to another position in the output are declared by specifying the + * <p>Fields which are unchanged copied to another position in the output are declared by specifying the * source field expression in the input and the target field expression in the output. * {@code {@literal @}ForwardedFieldsFirst({"f0->f2"})} denotes that the first field of the Java input tuple at the first input is * unchanged copied to the third field of the Java output tuple. When using the wildcard ("*") ensure that * the number of declared fields and their types in input and output type match. * - * Multiple forwarded fields can be annotated in one ({@code {@literal @}ForwardedFieldsFirst({"f2; f3->f0; f4"})}) + * <p>Multiple forwarded fields can be annotated in one ({@code {@literal @}ForwardedFieldsFirst({"f2; f3->f0; f4"})}) * or separate Strings ({@code {@literal @}ForwardedFieldsFirst({"f2", "f3->f0", "f4"})}). * - * <b>NOTE: The use of the ForwardedFieldsFirst annotation is optional. + * <p><b>NOTE: The use of the ForwardedFieldsFirst annotation is optional. * If used correctly, it can help the Flink optimizer to generate more efficient execution plans. * However if used incorrectly, it can cause invalid plan choices and the computation of wrong results! * It is NOT required that all forwarded fields are declared, but all declarations must be correct. * </b> * - * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * <p>Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for * details on field expressions such as nested fields and wildcard. * - * Forwarded fields from the second input can be specified using the + * <p>Forwarded fields from the second input can be specified using the * {@link org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond} annotation. * */ @@ -159,30 +157,30 @@ public class FunctionAnnotation { * never modified by the annotated function and which are forwarded at the same position to the * output or unchanged copied to another position in the output. * - * Fields that are forwarded from the second input at the same position in the output can be + * <p>Fields that are forwarded from the second input at the same position in the output can be * specified by their position. The specified position must be valid for the input and output data type and have the same type. * For example {@code {@literal @}ForwardedFieldsSecond({"f2"})} declares that the third field of a Java input tuple at the second input is * copied to the third field of an output tuple. * - * Fields which are unchanged copied to another position in the output are declared by specifying the + * <p>Fields which are unchanged copied to another position in the output are declared by specifying the * source field expression in the input and the target field expression in the output. * {@code {@literal @}ForwardedFieldsSecond({"f0->f2"})} denotes that the first field of the Java input tuple at the second input is * unchanged copied to the third field of the Java output tuple. When using the wildcard ("*") ensure that * the number of declared fields and their types in input and output type match. * - * Multiple forwarded fields can be annotated in one ({@code {@literal @}ForwardedFieldsSecond({"f2; f3->f0; f4"})}) + * <p>Multiple forwarded fields can be annotated in one ({@code {@literal @}ForwardedFieldsSecond({"f2; f3->f0; f4"})}) * or separate Strings ({@code {@literal @}ForwardedFieldsSecond({"f2", "f3->f0", "f4"})}). * - * <b>NOTE: The use of the ForwardedFieldsSecond annotation is optional. + * <p><b>NOTE: The use of the ForwardedFieldsSecond annotation is optional. * If used correctly, it can help the Flink optimizer to generate more efficient execution plans. * However if used incorrectly, it can cause invalid plan choices and the computation of wrong results! * It is NOT required that all forwarded fields are declared, but all declarations must be correct. * </b> * - * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * <p>Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for * details on field expressions such as nested fields and wildcard. * - * Forwarded fields from the first input can be specified using the + * <p>Forwarded fields from the first input can be specified using the * {@link org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst} annotation. * */ @@ -197,20 +195,20 @@ public class FunctionAnnotation { * ALL other fields are considered to be unmodified at the same position. * Hence, the NonForwardedFields annotation is inverse to the {@link org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields} annotation. * - * <b>NOTE: The use of the NonForwardedFields annotation is optional. + * <p><b>NOTE: The use of the NonForwardedFields annotation is optional. * If used correctly, it can help the Flink optimizer to generate more efficient execution plans. * However if used incorrectly, it can cause invalid plan choices and the computation of wrong results! * Since all not declared fields are considered to be forwarded, it is required that ALL non-forwarded fields are declared. * </b> * - * Non-forwarded fields are declared as a list of field expressions, e.g., <code>\@NonForwardedFields({"f1; f3"})</code> + * <p>Non-forwarded fields are declared as a list of field expressions, e.g., <code>\@NonForwardedFields({"f1; f3"})</code> * declares that the second and fourth field of a Java tuple are modified and all other fields are are not changed and remain * on their position. A NonForwardedFields annotation can only be used on functions where the type of the input and output are identical. * - * Multiple non-forwarded fields can be annotated in one (<code>\@NonForwardedFields({"f1; f3"})</code>) + * <p>Multiple non-forwarded fields can be annotated in one (<code>\@NonForwardedFields({"f1; f3"})</code>) * or separate Strings (<code>\@NonForwardedFields({"f1", "f3"})</code>). * - * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * <p>Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for * details on field expressions such as nested fields and wildcard. * * @see org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields @@ -227,21 +225,21 @@ public class FunctionAnnotation { * ALL other fields are considered to be unmodified at the same position. * Hence, the NonForwardedFieldsFirst annotation is inverse to the {@link org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst} annotation. * - * <b>NOTE: The use of the NonForwardedFieldsFirst annotation is optional. + * <p><b>NOTE: The use of the NonForwardedFieldsFirst annotation is optional. * If used correctly, it can help the Flink optimizer to generate more efficient execution plans. * However if used incorrectly, it can cause invalid plan choices and the computation of wrong results! * Since all not declared fields are considered to be forwarded, it is required that ALL non-forwarded fields of the first input are declared. * </b> * - * Non-forwarded fields are declared as a list of field expressions, e.g., <code>\@NonForwardedFieldsFirst({"f1; f3"})</code> + * <p>Non-forwarded fields are declared as a list of field expressions, e.g., <code>\@NonForwardedFieldsFirst({"f1; f3"})</code> * declares that the second and fourth field of a Java tuple from the first input are modified and * all other fields of the first input are are not changed and remain on their position. * A NonForwardedFieldsFirst annotation can only be used on functions where the type of the first input and the output are identical. * - * Multiple non-forwarded fields can be annotated in one (<code>\@NonForwardedFieldsFirst({"f1; f3"})</code>) + * <p>Multiple non-forwarded fields can be annotated in one (<code>\@NonForwardedFieldsFirst({"f1; f3"})</code>) * or separate Strings (<code>\@NonForwardedFieldsFirst({"f1", "f3"})</code>). * - * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * <p>Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for * details on field expressions such as nested fields and wildcard. * * @see org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields @@ -259,21 +257,21 @@ public class FunctionAnnotation { * ALL other fields are considered to be unmodified at the same position. * Hence, the NonForwardedFieldsSecond annotation is inverse to the {@link org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond} annotation. * - * <b>NOTE: The use of the NonForwardedFieldsSecond annotation is optional. + * <p><b>NOTE: The use of the NonForwardedFieldsSecond annotation is optional. * If used correctly, it can help the Flink optimizer to generate more efficient execution plans. * However if used incorrectly, it can cause invalid plan choices and the computation of wrong results! * Since all not declared fields are considered to be forwarded, it is required that ALL non-forwarded fields of the second input are declared. * </b> * - * Non-forwarded fields are declared as a list of field expressions, e.g., <code>\@NonForwardedFieldsSecond({"f1; f3"})</code> + * <p>Non-forwarded fields are declared as a list of field expressions, e.g., <code>\@NonForwardedFieldsSecond({"f1; f3"})</code> * declares that the second and fourth field of a Java tuple from the second input are modified and * all other fields of the second input are are not changed and remain on their position. * A NonForwardedFieldsSecond annotation can only be used on functions where the type of the second input and the output are identical. * - * Multiple non-forwarded fields can be annotated in one (<code>\@NonForwardedFieldsSecond({"f1; f3"})</code>) + * <p>Multiple non-forwarded fields can be annotated in one (<code>\@NonForwardedFieldsSecond({"f1; f3"})</code>) * or separate Strings (<code>\@NonForwardedFieldsSecond({"f1", "f3"})</code>). * - * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * <p>Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for * details on field expressions such as nested fields and wildcard. * * @see org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields @@ -291,20 +289,20 @@ public class FunctionAnnotation { * For example, fields which are evaluated in conditional statements or used for computations are considered to be read. * Fields which are only unmodified copied to the output without evaluating their values are NOT considered to be read. * - * <b>NOTE: The use of the ReadFields annotation is optional. + * <p><b>NOTE: The use of the ReadFields annotation is optional. * If used correctly, it can help the Flink optimizer to generate more efficient execution plans. * The ReadFields annotation requires that ALL read fields are declared. * Otherwise, it can cause invalid plan choices and the computation of wrong results! * Declaring a non-read field as read is not harmful but might reduce optimization potential. * </b> * - * Read fields are declared as a list of field expressions, e.g., <code>\@ReadFields({"f0; f2"})</code> declares the first and third + * <p>Read fields are declared as a list of field expressions, e.g., <code>\@ReadFields({"f0; f2"})</code> declares the first and third * field of a Java input tuple to be read. All other fields are considered to not influence the behavior of the function. * - * Multiple read fields can be declared in one <code>\@ReadFields({"f0; f2"})</code> or + * <p>Multiple read fields can be declared in one <code>\@ReadFields({"f0; f2"})</code> or * multiple separate Strings <code>\@ReadFields({"f0", "f2"})</code>. * - * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * <p>Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for * details on field expressions such as nested fields and wildcard. * */ @@ -321,21 +319,21 @@ public class FunctionAnnotation { * For example, fields which are evaluated in conditional statements or used for computations are considered to be read. * Fields which are only unmodified copied to the output without evaluating their values are NOT considered to be read. * - * <b>NOTE: The use of the ReadFieldsFirst annotation is optional. + * <p><b>NOTE: The use of the ReadFieldsFirst annotation is optional. * If used correctly, it can help the Flink optimizer to generate more efficient execution plans. * The ReadFieldsFirst annotation requires that ALL read fields of the first input are declared. * Otherwise, it can cause invalid plan choices and the computation of wrong results! * Declaring a non-read field as read is not harmful but might reduce optimization potential. * </b> * - * Read fields are declared as a list of field expressions, e.g., <code>\@ReadFieldsFirst({"f0; f2"})</code> declares the first and third + * <p>Read fields are declared as a list of field expressions, e.g., <code>\@ReadFieldsFirst({"f0; f2"})</code> declares the first and third * field of a Java input tuple of the first input to be read. * All other fields of the first input are considered to not influence the behavior of the function. * - * Multiple read fields can be declared in one <code>\@ReadFieldsFirst({"f0; f2"})</code> or + * <p>Multiple read fields can be declared in one <code>\@ReadFieldsFirst({"f0; f2"})</code> or * multiple separate Strings <code>\@ReadFieldsFirst({"f0", "f2"})</code>. * - * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * <p>Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for * details on field expressions such as nested fields and wildcard. * */ @@ -352,21 +350,21 @@ public class FunctionAnnotation { * For example, fields which are evaluated in conditional statements or used for computations are considered to be read. * Fields which are only unmodified copied to the output without evaluating their values are NOT considered to be read. * - * <b>NOTE: The use of the ReadFieldsSecond annotation is optional. + * <p><b>NOTE: The use of the ReadFieldsSecond annotation is optional. * If used correctly, it can help the Flink optimizer to generate more efficient execution plans. * The ReadFieldsSecond annotation requires that ALL read fields of the second input are declared. * Otherwise, it can cause invalid plan choices and the computation of wrong results! * Declaring a non-read field as read is not harmful but might reduce optimization potential. * </b> * - * Read fields are declared as a list of field expressions, e.g., <code>\@ReadFieldsSecond({"f0; f2"})</code> declares the first and third + * <p>Read fields are declared as a list of field expressions, e.g., <code>\@ReadFieldsSecond({"f0; f2"})</code> declares the first and third * field of a Java input tuple of the second input to be read. * All other fields of the second input are considered to not influence the behavior of the function. * - * Multiple read fields can be declared in one <code>\@ReadFieldsSecond({"f0; f2"})</code> or + * <p>Multiple read fields can be declared in one <code>\@ReadFieldsSecond({"f0; f2"})</code> or * multiple separate Strings <code>\@ReadFieldsSecond({"f0", "f2"})</code>. * - * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for + * <p>Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for * details on field expressions such as nested fields and wildcard. * */ @@ -381,7 +379,7 @@ public class FunctionAnnotation { * The SkipCodeAnalysis annotation declares that a function will not be analyzed by Flink's * code analysis capabilities independent of the configured {@link org.apache.flink.api.common.CodeAnalysisMode}. * - * If this annotation is not present the static code analyzer pre-interprets user-defined + * <p>If this annotation is not present the static code analyzer pre-interprets user-defined * functions in order to get implementation insights for program improvements that can be * printed to the log as hints, automatically applied, or disabled (see * {@link org.apache.flink.api.common.ExecutionConfig}). @@ -404,7 +402,7 @@ public class FunctionAnnotation { /** * Reads the annotations of a user defined function with one input and returns semantic properties according to the forwarded fields annotated. - * + * * @param udfClass The user defined function, represented by its class. * @return The DualInputSemanticProperties containing the forwarded fields. */ @@ -415,17 +413,17 @@ public class FunctionAnnotation { ReadFields readSet = udfClass.getAnnotation(ReadFields.class); Set<Annotation> annotations = new HashSet<Annotation>(); - if(forwardedFields != null) { + if (forwardedFields != null) { annotations.add(forwardedFields); } - if(nonForwardedFields != null) { - if(!annotations.isEmpty()) { + if (nonForwardedFields != null) { + if (!annotations.isEmpty()) { throw new InvalidProgramException("Either " + ForwardedFields.class.getSimpleName() + " or " + NonForwardedFields.class.getSimpleName() + " can be annotated to a function, not both."); } annotations.add(nonForwardedFields); } - if(readSet != null) { + if (readSet != null) { annotations.add(readSet); } @@ -443,7 +441,7 @@ public class FunctionAnnotation { // get readSet annotation from stub ForwardedFieldsFirst forwardedFields1 = udfClass.getAnnotation(ForwardedFieldsFirst.class); - ForwardedFieldsSecond forwardedFields2= udfClass.getAnnotation(ForwardedFieldsSecond.class); + ForwardedFieldsSecond forwardedFields2 = udfClass.getAnnotation(ForwardedFieldsSecond.class); // get readSet annotation from stub NonForwardedFieldsFirst nonForwardedFields1 = udfClass.getAnnotation(NonForwardedFieldsFirst.class); http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java index 722d99b..b381049 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java @@ -18,23 +18,26 @@ package org.apache.flink.api.java.functions; -import java.util.Iterator; - import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.util.Collector; +import java.util.Iterator; + +/** + * Base class that simplifies reducing all values provided as {@link Iterable}. + * @param <IN> + * @param <OUT> + */ @PublicEvolving public abstract class GroupReduceIterator<IN, OUT> extends RichGroupReduceFunction<IN, OUT> { - - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; public abstract Iterator<OUT> reduceGroup(Iterable<IN> values) throws Exception; - - + // ------------------------------------------------------------------------------------------- - + @Override public final void reduce(Iterable<IN> values, Collector<OUT> out) throws Exception { for (Iterator<OUT> iter = reduceGroup(values); iter.hasNext(); ) { http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/IdPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/IdPartitioner.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/IdPartitioner.java index 31caf86..4104eda 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/IdPartitioner.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/IdPartitioner.java @@ -21,6 +21,9 @@ package org.apache.flink.api.java.functions; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.Partitioner; +/** + * Partitioner that partitions by id. + */ @Internal public class IdPartitioner implements Partitioner<Integer> { http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInCoordinator.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInCoordinator.java index 61b28af..31bf211 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInCoordinator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInCoordinator.java @@ -15,12 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.functions; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.java.sampling.IntermediateSampleData; import org.apache.flink.api.java.sampling.DistributedRandomSampler; +import org.apache.flink.api.java.sampling.IntermediateSampleData; import org.apache.flink.api.java.sampling.ReservoirSamplerWithReplacement; import org.apache.flink.api.java.sampling.ReservoirSamplerWithoutReplacement; import org.apache.flink.util.Collector; http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInPartition.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInPartition.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInPartition.java index d524200..d38c0fe 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInPartition.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInPartition.java @@ -15,12 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.functions; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.RichMapPartitionFunction; -import org.apache.flink.api.java.sampling.IntermediateSampleData; import org.apache.flink.api.java.sampling.DistributedRandomSampler; +import org.apache.flink.api.java.sampling.IntermediateSampleData; import org.apache.flink.api.java.sampling.ReservoirSamplerWithReplacement; import org.apache.flink.api.java.sampling.ReservoirSamplerWithoutReplacement; import org.apache.flink.util.Collector; @@ -70,4 +71,3 @@ public class SampleInPartition<T> extends RichMapPartitionFunction<T, Intermedia } } - http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleWithFraction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleWithFraction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleWithFraction.java index 04730f2..358cda7 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleWithFraction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleWithFraction.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.functions; import org.apache.flink.annotation.Internal; http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java index 6e806db..e756c63 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.functions; import org.apache.flink.annotation.Internal; @@ -22,13 +23,17 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.TupleTypeInfo; +/** + * Function that enables selection by maximal value of a field. + * @param <T> + */ @Internal public class SelectByMaxFunction<T extends Tuple> implements ReduceFunction<T> { private static final long serialVersionUID = 1L; - + // Fields which are used as KEYS private int[] fields; - + /** * Constructor which is overwriting the default constructor. * @param type Types of tuple whether to check if given fields are key types. @@ -38,7 +43,7 @@ public class SelectByMaxFunction<T extends Tuple> implements ReduceFunction<T> { */ public SelectByMaxFunction(TupleTypeInfo<T> type, int... fields) { this.fields = fields; - + // Check correctness of each position for (int field : fields) { // Is field inside array @@ -57,12 +62,12 @@ public class SelectByMaxFunction<T extends Tuple> implements ReduceFunction<T> { } /** - * Reduce implementation, returns bigger tuple or value1 if both tuples are + * Reduce implementation, returns bigger tuple or value1 if both tuples are * equal. Comparison highly depends on the order and amount of fields chosen * as indices. All given fields (at construction time) are checked in the same - * order as defined (at construction time). If both tuples are equal in one + * order as defined (at construction time). If both tuples are equal in one * index, the next index is compared. Or if no next index is available value1 - * is returned. + * is returned. * The tuple which has a bigger value at one index will be returned. */ @SuppressWarnings({ "unchecked", "rawtypes" }) http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java index fdd5f7f..1ea4117 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.functions; import org.apache.flink.annotation.Internal; @@ -22,13 +23,17 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.TupleTypeInfo; +/** + * Function that enables selection by minimal value of a field. + * @param <T> + */ @Internal public class SelectByMinFunction<T extends Tuple> implements ReduceFunction<T> { private static final long serialVersionUID = 1L; - + // Fields which are used as KEYS private int[] fields; - + /** * Constructor which is overwriting the default constructor. * @param type Types of tuple whether to check if given fields are key types. @@ -38,7 +43,7 @@ public class SelectByMinFunction<T extends Tuple> implements ReduceFunction<T> { */ public SelectByMinFunction(TupleTypeInfo<T> type, int... fields) { this.fields = fields; - + // Check correctness of each position for (int field : fields) { // Is field inside array @@ -55,14 +60,14 @@ public class SelectByMinFunction<T extends Tuple> implements ReduceFunction<T> { } } - + /** - * Reduce implementation, returns smaller tuple or value1 if both tuples are + * Reduce implementation, returns smaller tuple or value1 if both tuples are * equal. Comparison highly depends on the order and amount of fields chosen * as indices. All given fields (at construction time) are checked in the same - * order as defined (at construction time). If both tuples are equal in one + * order as defined (at construction time). If both tuples are equal in one * index, the next index is compared. Or if no next index is available value1 - * is returned. + * is returned. * The tuple which has a smaller value at one index will be returned. */ @SuppressWarnings({ "unchecked", "rawtypes" }) http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java index aedba15..e254d94 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java @@ -16,19 +16,19 @@ * limitations under the License. */ - package org.apache.flink.api.java.functions; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.operators.DualInputSemanticProperties; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.SemanticProperties.InvalidSemanticAnnotationException; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException; import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; +import org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; @@ -38,7 +38,6 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsFirst; import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsSecond; -import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import java.lang.annotation.Annotation; @@ -55,14 +54,14 @@ import java.util.regex.Pattern; @Internal public final class SemanticPropUtil { - private final static String REGEX_WILDCARD = "[\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR+"\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA+"]"; - private final static String REGEX_SINGLE_FIELD = "[\\p{L}\\p{Digit}_\\$]+"; - private final static String REGEX_NESTED_FIELDS = "((" + REGEX_SINGLE_FIELD + "\\.)*" + REGEX_SINGLE_FIELD + ")(\\."+ REGEX_WILDCARD +")?"; + private static final String REGEX_WILDCARD = "[\\" + Keys.ExpressionKeys.SELECT_ALL_CHAR + "\\" + Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA + "]"; + private static final String REGEX_SINGLE_FIELD = "[\\p{L}\\p{Digit}_\\$]+"; + private static final String REGEX_NESTED_FIELDS = "((" + REGEX_SINGLE_FIELD + "\\.)*" + REGEX_SINGLE_FIELD + ")(\\." + REGEX_WILDCARD + ")?"; - private final static String REGEX_LIST = "((" + REGEX_NESTED_FIELDS + ";)*(" + REGEX_NESTED_FIELDS + ");?)"; - private final static String REGEX_FORWARD = "(("+ REGEX_NESTED_FIELDS +"|"+ REGEX_WILDCARD +")->(" + REGEX_NESTED_FIELDS + "|"+ REGEX_WILDCARD +"))"; - private final static String REGEX_FIELD_OR_FORWARD = "(" + REGEX_NESTED_FIELDS + "|" + REGEX_FORWARD + ")"; - private final static String REGEX_ANNOTATION = "((" + REGEX_FIELD_OR_FORWARD + ";)*(" + REGEX_FIELD_OR_FORWARD + ");?)"; + private static final String REGEX_LIST = "((" + REGEX_NESTED_FIELDS + ";)*(" + REGEX_NESTED_FIELDS + ");?)"; + private static final String REGEX_FORWARD = "((" + REGEX_NESTED_FIELDS + "|" + REGEX_WILDCARD + ")->(" + REGEX_NESTED_FIELDS + "|" + REGEX_WILDCARD + "))"; + private static final String REGEX_FIELD_OR_FORWARD = "(" + REGEX_NESTED_FIELDS + "|" + REGEX_FORWARD + ")"; + private static final String REGEX_ANNOTATION = "((" + REGEX_FIELD_OR_FORWARD + ";)*(" + REGEX_FIELD_OR_FORWARD + ");?)"; private static final Pattern PATTERN_WILDCARD = Pattern.compile(REGEX_WILDCARD); private static final Pattern PATTERN_FORWARD = Pattern.compile(REGEX_FORWARD); @@ -70,8 +69,7 @@ public final class SemanticPropUtil { private static final Pattern PATTERN_LIST = Pattern.compile(REGEX_LIST); private static final Pattern PATTERN_FIELD = Pattern.compile(REGEX_NESTED_FIELDS); - public static SingleInputSemanticProperties createProjectionPropertiesSingle(int[] fields, CompositeType<?> inType) - { + public static SingleInputSemanticProperties createProjectionPropertiesSingle(int[] fields, CompositeType<?> inType) { Character.isJavaIdentifierStart(1); @@ -79,17 +77,17 @@ public final class SemanticPropUtil { int[] sourceOffsets = new int[inType.getArity()]; sourceOffsets[0] = 0; - for(int i=1; i<inType.getArity(); i++) { - sourceOffsets[i] = inType.getTypeAt(i-1).getTotalFields() + sourceOffsets[i-1]; + for (int i = 1; i < inType.getArity(); i++) { + sourceOffsets[i] = inType.getTypeAt(i - 1).getTotalFields() + sourceOffsets[i - 1]; } int targetOffset = 0; - for(int i=0; i<fields.length; i++) { + for (int i = 0; i < fields.length; i++) { int sourceOffset = sourceOffsets[fields[i]]; int numFieldsToCopy = inType.getTypeAt(fields[i]).getTotalFields(); - for(int j=0; j<numFieldsToCopy; j++) { - ssp.addForwardedField(sourceOffset+j, targetOffset+j); + for (int j = 0; j < numFieldsToCopy; j++) { + ssp.addForwardedField(sourceOffset + j, targetOffset + j); } targetOffset += numFieldsToCopy; } @@ -98,45 +96,44 @@ public final class SemanticPropUtil { } public static DualInputSemanticProperties createProjectionPropertiesDual( - int[] fields, boolean[] isFromFirst, TypeInformation<?> inType1, TypeInformation<?> inType2) - { + int[] fields, boolean[] isFromFirst, TypeInformation<?> inType1, TypeInformation<?> inType2) { DualInputSemanticProperties dsp = new DualInputSemanticProperties(); int[] sourceOffsets1; - if(inType1 instanceof TupleTypeInfo<?>) { + if (inType1 instanceof TupleTypeInfo<?>) { sourceOffsets1 = new int[inType1.getArity()]; sourceOffsets1[0] = 0; - for(int i=1; i<inType1.getArity(); i++) { - sourceOffsets1[i] = ((TupleTypeInfo<?>)inType1).getTypeAt(i-1).getTotalFields() + sourceOffsets1[i-1]; + for (int i = 1; i < inType1.getArity(); i++) { + sourceOffsets1[i] = ((TupleTypeInfo<?>) inType1).getTypeAt(i - 1).getTotalFields() + sourceOffsets1[i - 1]; } } else { - sourceOffsets1 = new int[] {0}; + sourceOffsets1 = new int[]{0}; } int[] sourceOffsets2; - if(inType2 instanceof TupleTypeInfo<?>) { + if (inType2 instanceof TupleTypeInfo<?>) { sourceOffsets2 = new int[inType2.getArity()]; sourceOffsets2[0] = 0; - for(int i=1; i<inType2.getArity(); i++) { - sourceOffsets2[i] = ((TupleTypeInfo<?>)inType2).getTypeAt(i-1).getTotalFields() + sourceOffsets2[i-1]; + for (int i = 1; i < inType2.getArity(); i++) { + sourceOffsets2[i] = ((TupleTypeInfo<?>) inType2).getTypeAt(i - 1).getTotalFields() + sourceOffsets2[i - 1]; } } else { - sourceOffsets2 = new int[] {0}; + sourceOffsets2 = new int[]{0}; } int targetOffset = 0; - for(int i=0; i<fields.length; i++) { + for (int i = 0; i < fields.length; i++) { int sourceOffset; int numFieldsToCopy; int input; - if(isFromFirst[i]) { + if (isFromFirst[i]) { input = 0; if (fields[i] == -1) { sourceOffset = 0; numFieldsToCopy = inType1.getTotalFields(); } else { sourceOffset = sourceOffsets1[fields[i]]; - numFieldsToCopy = ((TupleTypeInfo<?>)inType1).getTypeAt(fields[i]).getTotalFields(); + numFieldsToCopy = ((TupleTypeInfo<?>) inType1).getTypeAt(fields[i]).getTotalFields(); } } else { input = 1; @@ -145,12 +142,12 @@ public final class SemanticPropUtil { numFieldsToCopy = inType2.getTotalFields(); } else { sourceOffset = sourceOffsets2[fields[i]]; - numFieldsToCopy = ((TupleTypeInfo<?>)inType2).getTypeAt(fields[i]).getTotalFields(); + numFieldsToCopy = ((TupleTypeInfo<?>) inType2).getTypeAt(fields[i]).getTotalFields(); } } - for(int j=0; j<numFieldsToCopy; j++) { - dsp.addForwardedField(input, sourceOffset+j, targetOffset+j); + for (int j = 0; j < numFieldsToCopy; j++) { + dsp.addForwardedField(input, sourceOffset + j, targetOffset + j); } targetOffset += numFieldsToCopy; } @@ -203,33 +200,33 @@ public final class SemanticPropUtil { DualInputSemanticProperties offsetProps = new DualInputSemanticProperties(); // add offset to read fields on first input - if(props.getReadFields(0) != null) { + if (props.getReadFields(0) != null) { FieldSet offsetReadFields = new FieldSet(); - for(int r : props.getReadFields(0)) { - offsetReadFields = offsetReadFields.addField(r+offset1); + for (int r : props.getReadFields(0)) { + offsetReadFields = offsetReadFields.addField(r + offset1); } offsetProps.addReadFields(0, offsetReadFields); } // add offset to read fields on second input - if(props.getReadFields(1) != null) { + if (props.getReadFields(1) != null) { FieldSet offsetReadFields = new FieldSet(); - for(int r : props.getReadFields(1)) { - offsetReadFields = offsetReadFields.addField(r+offset2); + for (int r : props.getReadFields(1)) { + offsetReadFields = offsetReadFields.addField(r + offset2); } offsetProps.addReadFields(1, offsetReadFields); } // add offset to forward fields on first input - for(int s = 0; s < numInputFields1; s++) { + for (int s = 0; s < numInputFields1; s++) { FieldSet targetFields = props.getForwardingTargetFields(0, s); - for(int t : targetFields) { + for (int t : targetFields) { offsetProps.addForwardedField(0, s + offset1, t); } } // add offset to forward fields on second input - for(int s = 0; s < numInputFields2; s++) { + for (int s = 0; s < numInputFields2; s++) { FieldSet targetFields = props.getForwardingTargetFields(1, s); - for(int t : targetFields) { + for (int t : targetFields) { offsetProps.addForwardedField(1, s + offset2, t); } } @@ -261,11 +258,11 @@ public final class SemanticPropUtil { } else if (ann instanceof ForwardedFieldsFirst || ann instanceof ForwardedFieldsSecond || ann instanceof NonForwardedFieldsFirst || ann instanceof NonForwardedFieldsSecond || ann instanceof ReadFieldsFirst || ann instanceof ReadFieldsSecond) { - throw new InvalidSemanticAnnotationException("Annotation "+ann.getClass()+" invalid for single input function."); + throw new InvalidSemanticAnnotationException("Annotation " + ann.getClass() + " invalid for single input function."); } } - if(forwarded != null || nonForwarded != null || read != null) { + if (forwarded != null || nonForwarded != null || read != null) { SingleInputSemanticProperties result = new SingleInputSemanticProperties(); getSemanticPropsSingleFromString(result, forwarded, nonForwarded, read, inType, outType); return result; @@ -307,11 +304,11 @@ public final class SemanticPropUtil { } } - if(forwardedFirst != null || nonForwardedFirst != null || readFirst != null || - forwardedSecond != null ||nonForwardedSecond != null || readSecond != null) { + if (forwardedFirst != null || nonForwardedFirst != null || readFirst != null || + forwardedSecond != null || nonForwardedSecond != null || readSecond != null) { DualInputSemanticProperties result = new DualInputSemanticProperties(); getSemanticPropsDualFromString(result, forwardedFirst, forwardedSecond, - nonForwardedFirst, nonForwardedSecond, readFirst, readSecond, inType1, inType2, outType); + nonForwardedFirst, nonForwardedSecond, readFirst, readSecond, inType1, inType2, outType); return result; } return null; @@ -331,20 +328,20 @@ public final class SemanticPropUtil { boolean hasForwardedAnnotation = false; boolean hasNonForwardedAnnotation = false; // check for forwarded annotations - if(forwarded != null && forwarded.length > 0) { + if (forwarded != null && forwarded.length > 0) { hasForwardedAnnotation = true; } // check non-forwarded annotations - if(nonForwarded != null && nonForwarded.length > 0) { + if (nonForwarded != null && nonForwarded.length > 0) { hasNonForwardedAnnotation = true; } - if(hasForwardedAnnotation && hasNonForwardedAnnotation) { + if (hasForwardedAnnotation && hasNonForwardedAnnotation) { throw new InvalidSemanticAnnotationException("Either ForwardedFields OR " + "NonForwardedFields annotation permitted, NOT both."); - } else if(hasForwardedAnnotation) { + } else if (hasForwardedAnnotation) { parseForwardedFields(result, forwarded, inType, outType, 0, skipIncompatibleTypes); - } else if(hasNonForwardedAnnotation) { + } else if (hasNonForwardedAnnotation) { parseNonForwardedFields(result, nonForwarded, inType, outType, 0, skipIncompatibleTypes); } parseReadFields(result, readSet, inType, 0); @@ -356,8 +353,8 @@ public final class SemanticPropUtil { readFieldsFirst, String[] readFieldsSecond, TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType) { getSemanticPropsDualFromString(result, forwardedFirst, forwardedSecond, nonForwardedFirst, - nonForwardedSecond, readFieldsFirst, readFieldsSecond, inType1, inType2, outType, - false); + nonForwardedSecond, readFieldsFirst, readFieldsSecond, inType1, inType2, outType, + false); } public static void getSemanticPropsDualFromString(DualInputSemanticProperties result, @@ -372,38 +369,38 @@ public final class SemanticPropUtil { boolean hasNonForwardedFirstAnnotation = false; boolean hasNonForwardedSecondAnnotation = false; // check for forwarded annotations - if(forwardedFirst != null && forwardedFirst.length > 0) { + if (forwardedFirst != null && forwardedFirst.length > 0) { hasForwardedFirstAnnotation = true; } - if(forwardedSecond != null && forwardedSecond.length > 0) { + if (forwardedSecond != null && forwardedSecond.length > 0) { hasForwardedSecondAnnotation = true; } // check non-forwarded annotations - if(nonForwardedFirst != null && nonForwardedFirst.length > 0) { + if (nonForwardedFirst != null && nonForwardedFirst.length > 0) { hasNonForwardedFirstAnnotation = true; } - if(nonForwardedSecond != null && nonForwardedSecond.length > 0) { + if (nonForwardedSecond != null && nonForwardedSecond.length > 0) { hasNonForwardedSecondAnnotation = true; } - if(hasForwardedFirstAnnotation && hasNonForwardedFirstAnnotation) { + if (hasForwardedFirstAnnotation && hasNonForwardedFirstAnnotation) { throw new InvalidSemanticAnnotationException("Either ForwardedFieldsFirst OR " + "NonForwardedFieldsFirst annotation permitted, NOT both."); } - if(hasForwardedSecondAnnotation && hasNonForwardedSecondAnnotation) { + if (hasForwardedSecondAnnotation && hasNonForwardedSecondAnnotation) { throw new InvalidSemanticAnnotationException("Either ForwardedFieldsSecond OR " + "NonForwardedFieldsSecond annotation permitted, NOT both."); } - if(hasForwardedFirstAnnotation) { + if (hasForwardedFirstAnnotation) { parseForwardedFields(result, forwardedFirst, inType1, outType, 0, skipIncompatibleTypes); - } else if(hasNonForwardedFirstAnnotation) { + } else if (hasNonForwardedFirstAnnotation) { parseNonForwardedFields(result, nonForwardedFirst, inType1, outType, 0, skipIncompatibleTypes); } - if(hasForwardedSecondAnnotation) { + if (hasForwardedSecondAnnotation) { parseForwardedFields(result, forwardedSecond, inType2, outType, 1, skipIncompatibleTypes); - } else if(hasNonForwardedSecondAnnotation) { + } else if (hasNonForwardedSecondAnnotation) { parseNonForwardedFields(result, nonForwardedSecond, inType2, outType, 1, skipIncompatibleTypes); } @@ -411,7 +408,6 @@ public final class SemanticPropUtil { parseReadFields(result, readFieldsSecond, inType2, 1); } - private static void parseForwardedFields(SemanticProperties sp, String[] forwardedStr, TypeInformation<?> inType, TypeInformation<?> outType, int input, boolean skipIncompatibleTypes) { @@ -434,8 +430,7 @@ public final class SemanticPropUtil { if (!inType.equals(outType)) { if (skipIncompatibleTypes) { continue; - } - else { + } else { throw new InvalidSemanticAnnotationException("Forwarded field annotation \"" + s + "\" with wildcard only allowed for identical input and output types."); } @@ -468,8 +463,7 @@ public final class SemanticPropUtil { if (!areFieldsCompatible(sourceStr, inType, targetStr, outType, !skipIncompatibleTypes)) { if (skipIncompatibleTypes) { continue; - } - else { + } else { throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match."); } } @@ -511,8 +505,7 @@ public final class SemanticPropUtil { if (!areFieldsCompatible(fieldStr, inType, fieldStr, outType, !skipIncompatibleTypes)) { if (skipIncompatibleTypes) { continue; - } - else { + } else { throw new InvalidSemanticAnnotationException("Referenced fields of forwarded field annotation \"" + s + "\" do not match."); } } @@ -541,12 +534,12 @@ public final class SemanticPropUtil { private static void parseNonForwardedFields(SemanticProperties sp, String[] nonForwardedStr, TypeInformation<?> inType, TypeInformation<?> outType, int input, boolean skipIncompatibleTypes) { - if(nonForwardedStr == null) { + if (nonForwardedStr == null) { return; } FieldSet excludedFields = new FieldSet(); - for(String s : nonForwardedStr) { + for (String s : nonForwardedStr) { // remove white characters s = s.replaceAll("\\s", ""); @@ -555,18 +548,17 @@ public final class SemanticPropUtil { continue; } - if(!inType.equals(outType)) { + if (!inType.equals(outType)) { if (skipIncompatibleTypes) { continue; - } - else { + } else { throw new InvalidSemanticAnnotationException("Non-forwarded fields annotation only allowed for identical input and output types."); } } Matcher matcher = PATTERN_LIST.matcher(s); if (!matcher.matches()) { - throw new InvalidSemanticAnnotationException("Invalid format of non-forwarded fields annotation \""+s+"\"."); + throw new InvalidSemanticAnnotationException("Invalid format of non-forwarded fields annotation \"" + s + "\"."); } // process individual fields @@ -580,17 +572,17 @@ public final class SemanticPropUtil { for (FlatFieldDescriptor ffd : inFFDs) { excludedFields = excludedFields.addField(ffd.getPosition()); } - } catch(InvalidFieldReferenceException ifre) { - throw new InvalidSemanticAnnotationException("Invalid field reference in non-forwarded fields annotation \""+fieldStr+"\".",ifre); + } catch (InvalidFieldReferenceException ifre) { + throw new InvalidSemanticAnnotationException("Invalid field reference in non-forwarded fields annotation \"" + fieldStr + "\".", ifre); } } } - for(int i = 0; i < inType.getTotalFields(); i++) { - if(!excludedFields.contains(i)) { - if(sp instanceof SingleInputSemanticProperties) { - ((SingleInputSemanticProperties) sp).addForwardedField(i,i); - } else if(sp instanceof DualInputSemanticProperties) { + for (int i = 0; i < inType.getTotalFields(); i++) { + if (!excludedFields.contains(i)) { + if (sp instanceof SingleInputSemanticProperties) { + ((SingleInputSemanticProperties) sp).addForwardedField(i, i); + } else if (sp instanceof DualInputSemanticProperties) { ((DualInputSemanticProperties) sp).addForwardedField(input, i, i); } } @@ -600,11 +592,11 @@ public final class SemanticPropUtil { private static void parseReadFields(SemanticProperties sp, String[] readFieldStrings, TypeInformation<?> inType, int input) { - if(readFieldStrings == null) { + if (readFieldStrings == null) { return; } - for(String s : readFieldStrings) { + for (String s : readFieldStrings) { FieldSet readFields = new FieldSet(); @@ -662,12 +654,10 @@ public final class SemanticPropUtil { // get target type information TypeInformation<?> targetType = getExpressionTypeInformation(targetField, outType); return sourceType.equals(targetType); - } - catch (InvalidFieldReferenceException e) { + } catch (InvalidFieldReferenceException e) { if (throwException) { throw e; - } - else { + } else { return false; } } @@ -675,30 +665,30 @@ public final class SemanticPropUtil { private static TypeInformation<?> getExpressionTypeInformation(String fieldStr, TypeInformation<?> typeInfo) { Matcher wildcardMatcher = PATTERN_WILDCARD.matcher(fieldStr); - if(wildcardMatcher.matches()) { + if (wildcardMatcher.matches()) { return typeInfo; } else { Matcher expMatcher = PATTERN_FIELD.matcher(fieldStr); - if(!expMatcher.matches()) { - throw new InvalidFieldReferenceException("Invalid field expression \""+fieldStr+"\"."); + if (!expMatcher.matches()) { + throw new InvalidFieldReferenceException("Invalid field expression \"" + fieldStr + "\"."); } - if(typeInfo instanceof CompositeType<?>) { + if (typeInfo instanceof CompositeType<?>) { return ((CompositeType<?>) typeInfo).getTypeAt(expMatcher.group(1)); } else { - throw new InvalidFieldReferenceException("Nested field expression \""+fieldStr+"\" not possible on atomic type ("+typeInfo+")."); + throw new InvalidFieldReferenceException("Nested field expression \"" + fieldStr + "\" not possible on atomic type (" + typeInfo + ")."); } } } private static List<FlatFieldDescriptor> getFlatFields(String fieldStr, TypeInformation<?> typeInfo) { - if(typeInfo instanceof CompositeType<?>) { + if (typeInfo instanceof CompositeType<?>) { return ((CompositeType<?>) typeInfo).getFlatFields(fieldStr); } else { Matcher wildcardMatcher = PATTERN_WILDCARD.matcher(fieldStr); - if(wildcardMatcher.matches()) { + if (wildcardMatcher.matches()) { return Collections.singletonList(new FlatFieldDescriptor(0, typeInfo)); } else { - throw new InvalidFieldReferenceException("Nested field expression \""+fieldStr+"\" not possible on atomic type ("+typeInfo+")."); + throw new InvalidFieldReferenceException("Nested field expression \"" + fieldStr + "\" not possible on atomic type (" + typeInfo + ")."); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java index 496c61a..04f9b52 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java @@ -15,16 +15,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.functions; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ClosureCleaner; + import org.junit.Assert; import org.junit.Test; import java.io.Serializable; +/** + * Tests for {@link ClosureCleaner}. + */ public class ClosureCleanerTest { @Test(expected = InvalidProgramException.class) @@ -195,4 +200,3 @@ class NestedNonSerializableMapCreator implements MapCreator { } - http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java index 52b59ec..3cfd74e 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java @@ -15,36 +15,40 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.functions; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.typeutils.TupleTypeInfo; + import org.junit.Assert; import org.junit.Test; +/** + * Tests for {@link SelectByMaxFunction} and {@link SelectByMinFunction}. + */ public class SelectByFunctionsTest { - + private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>( BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO); - + private final Tuple5<Integer, Long, String, Long, Integer> bigger = new Tuple5<Integer, Long, String, Long, Integer>(10, 100L, "HelloWorld", 200L, 20); private final Tuple5<Integer, Long, String, Long, Integer> smaller = new Tuple5<Integer, Long, String, Long, Integer>(5, 50L, "Hello", 50L, 15); - + //Special case where only the last value determines if bigger or smaller private final Tuple5<Integer, Long, String, Long, Integer> specialCaseBigger = new Tuple5<Integer, Long, String, Long, Integer>(10, 100L, "HelloWorld", 200L, 17); private final Tuple5<Integer, Long, String, Long, Integer> specialCaseSmaller = new Tuple5<Integer, Long, String, Long, Integer>(5, 50L, "Hello", 50L, 17); - - + /** * This test validates whether the order of tuples has any impact on the outcome and if the bigger tuple is returned. */ @Test public void testMaxByComparison() { SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0}); - + try { Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(smaller, bigger)); Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(bigger, smaller)); @@ -52,17 +56,17 @@ public class SelectByFunctionsTest { Assert.fail("No exception should be thrown while comparing both tuples"); } } - + // ----------------------- MAXIMUM FUNCTION TEST BELOW -------------------------- - + /** * This test cases checks when two tuples only differ in one value, but this value is not * in the fields list. In that case it should be seen as equal and then the first given tuple (value1) should be returned by reduce(). */ @Test public void testMaxByComparisonSpecialCase1() { - SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0,3}); - + SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0, 3}); + try { Assert.assertSame("SelectByMax must return the first given tuple", specialCaseBigger, maxByTuple.reduce(specialCaseBigger, bigger)); Assert.assertSame("SelectByMax must return the first given tuple", bigger, maxByTuple.reduce(bigger, specialCaseBigger)); @@ -70,14 +74,14 @@ public class SelectByFunctionsTest { Assert.fail("No exception should be thrown while comparing both tuples"); } } - + /** * This test cases checks when two tuples only differ in one value. */ @Test public void testMaxByComparisonSpecialCase2() { - SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0,2,1,4,3}); - + SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0, 2, 1, 4, 3}); + try { Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(specialCaseBigger, bigger)); Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(bigger, specialCaseBigger)); @@ -85,14 +89,14 @@ public class SelectByFunctionsTest { Assert.fail("No exception should be thrown while comparing both tuples"); } } - + /** * This test validates that equality is independent of the amount of used indices. */ @Test public void testMaxByComparisonMultiple() { - SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0,1,2,3,4}); - + SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0, 1, 2, 3, 4}); + try { Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(smaller, bigger)); Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(bigger, smaller)); @@ -100,14 +104,14 @@ public class SelectByFunctionsTest { Assert.fail("No exception should be thrown while comparing both tuples"); } } - + /** - * Checks whether reduce does behave as expected if both values are the same object. + * Checks whether reduce does behave as expected if both values are the same object. */ @Test public void testMaxByComparisonMustReturnATuple() { SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0}); - + try { Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(bigger, bigger)); Assert.assertSame("SelectByMax must return smaller tuple", smaller, maxByTuple.reduce(smaller, smaller)); @@ -115,16 +119,16 @@ public class SelectByFunctionsTest { Assert.fail("No exception should be thrown while comparing both tuples"); } } - + // ----------------------- MINIMUM FUNCTION TEST BELOW -------------------------- - + /** * This test validates whether the order of tuples has any impact on the outcome and if the smaller tuple is returned. */ @Test public void testMinByComparison() { SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0}); - + try { Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(smaller, bigger)); Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(bigger, smaller)); @@ -132,15 +136,15 @@ public class SelectByFunctionsTest { Assert.fail("No exception should be thrown while comparing both tuples"); } } - + /** * This test cases checks when two tuples only differ in one value, but this value is not * in the fields list. In that case it should be seen as equal and then the first given tuple (value1) should be returned by reduce(). */ @Test public void testMinByComparisonSpecialCase1() { - SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0,3}); - + SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0, 3}); + try { Assert.assertSame("SelectByMin must return the first given tuple", specialCaseBigger, minByTuple.reduce(specialCaseBigger, bigger)); Assert.assertSame("SelectByMin must return the first given tuple", bigger, minByTuple.reduce(bigger, specialCaseBigger)); @@ -148,15 +152,15 @@ public class SelectByFunctionsTest { Assert.fail("No exception should be thrown while comparing both tuples"); } } - + /** * This test validates that when two tuples only differ in one value and that value's index is given * at construction time. The smaller tuple must be returned then. */ @Test public void testMinByComparisonSpecialCase2() { - SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0,2,1,4,3}); - + SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0, 2, 1, 4, 3}); + try { Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(specialCaseSmaller, smaller)); Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(smaller, specialCaseSmaller)); @@ -164,14 +168,14 @@ public class SelectByFunctionsTest { Assert.fail("No exception should be thrown while comparing both tuples"); } } - + /** - * Checks whether reduce does behave as expected if both values are the same object. + * Checks whether reduce does behave as expected if both values are the same object. */ @Test public void testMinByComparisonMultiple() { - SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0,1,2,3,4}); - + SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0, 1, 2, 3, 4}); + try { Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(smaller, bigger)); Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(bigger, smaller)); @@ -179,6 +183,5 @@ public class SelectByFunctionsTest { Assert.fail("No exception should be thrown while comparing both tuples"); } } - } http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java index 453c022..8c4c32d 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.java.functions; import org.apache.flink.api.common.operators.DualInputSemanticProperties; @@ -32,12 +31,16 @@ import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; + import org.junit.Test; import static org.junit.Assert.assertTrue; +/** + * Tests for semantic properties utils. + */ public class SemanticPropUtilTest { - + private final TypeInformation<?> threeIntTupleType = new TupleTypeInfo<Tuple3<Integer, Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO); @@ -74,13 +77,13 @@ public class SemanticPropUtilTest { @Test public void testSingleProjectionProperties() { - int[] pMap = new int[] {3,0,4}; + int[] pMap = new int[] {3, 0, 4}; SingleInputSemanticProperties sp = SemanticPropUtil.createProjectionPropertiesSingle(pMap, (CompositeType<?>) fiveIntTupleType); assertTrue(sp.getForwardingTargetFields(0, 0).contains(1)); assertTrue(sp.getForwardingTargetFields(0, 3).contains(0)); assertTrue(sp.getForwardingTargetFields(0, 4).contains(2)); - pMap = new int[] {2,2,1,1}; + pMap = new int[] {2, 2, 1, 1}; sp = SemanticPropUtil.createProjectionPropertiesSingle(pMap, (CompositeType<?>) fiveIntTupleType); assertTrue(sp.getForwardingTargetFields(0, 1).size() == 2); assertTrue(sp.getForwardingTargetFields(0, 1).contains(2)); @@ -89,14 +92,14 @@ public class SemanticPropUtilTest { assertTrue(sp.getForwardingTargetFields(0, 2).contains(0)); assertTrue(sp.getForwardingTargetFields(0, 2).contains(1)); - pMap = new int[] {2,0}; + pMap = new int[] {2, 0}; sp = SemanticPropUtil.createProjectionPropertiesSingle(pMap, (CompositeType<?>) nestedTupleType); assertTrue(sp.getForwardingTargetFields(0, 4).contains(0)); assertTrue(sp.getForwardingTargetFields(0, 0).contains(1)); assertTrue(sp.getForwardingTargetFields(0, 1).contains(2)); assertTrue(sp.getForwardingTargetFields(0, 2).contains(3)); - pMap = new int[] {2,0,1}; + pMap = new int[] {2, 0, 1}; sp = SemanticPropUtil.createProjectionPropertiesSingle(pMap, (CompositeType<?>) deepNestedTupleType); assertTrue(sp.getForwardingTargetFields(0, 6).contains(0)); assertTrue(sp.getForwardingTargetFields(0, 0).contains(1)); @@ -119,7 +122,7 @@ public class SemanticPropUtilTest { @Test public void testDualProjectionProperties() { - int[] pMap = new int[]{4,2,0,1,3,4}; + int[] pMap = new int[]{4, 2, 0, 1, 3, 4}; boolean[] iMap = new boolean[]{true, true, false, true, false, false}; DualInputSemanticProperties sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, iMap, fiveIntTupleType, fiveIntTupleType); @@ -130,7 +133,7 @@ public class SemanticPropUtilTest { assertTrue(sp.getForwardingTargetFields(1, 3).contains(4)); assertTrue(sp.getForwardingTargetFields(1, 4).contains(5)); - pMap = new int[]{4,2,0,4,0,1}; + pMap = new int[]{4, 2, 0, 4, 0, 1}; iMap = new boolean[]{true, true, false, true, false, false}; sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, iMap, fiveIntTupleType, fiveIntTupleType); assertTrue(sp.getForwardingTargetFields(0, 4).size() == 2); @@ -142,7 +145,7 @@ public class SemanticPropUtilTest { assertTrue(sp.getForwardingTargetFields(1, 0).contains(4)); assertTrue(sp.getForwardingTargetFields(1, 1).contains(5)); - pMap = new int[]{2,1,0,1}; + pMap = new int[]{2, 1, 0, 1}; iMap = new boolean[]{false, false, true, true}; sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, iMap, nestedTupleType, threeIntTupleType); assertTrue(sp.getForwardingTargetFields(1, 2).contains(0)); @@ -152,7 +155,7 @@ public class SemanticPropUtilTest { assertTrue(sp.getForwardingTargetFields(0, 2).contains(4)); assertTrue(sp.getForwardingTargetFields(0, 3).contains(5)); - pMap = new int[]{1,0,0}; + pMap = new int[]{1, 0, 0}; iMap = new boolean[]{false, false, true}; sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, iMap, nestedTupleType, deepNestedTupleType); assertTrue(sp.getForwardingTargetFields(1, 1).contains(0)); @@ -165,7 +168,7 @@ public class SemanticPropUtilTest { assertTrue(sp.getForwardingTargetFields(0, 1).contains(7)); assertTrue(sp.getForwardingTargetFields(0, 2).contains(8)); - pMap = new int[]{4,2,1,0}; + pMap = new int[]{4, 2, 1, 0}; iMap = new boolean[]{true, false, true, false}; sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, iMap, fiveIntTupleType, pojoInTupleType); assertTrue(sp.getForwardingTargetFields(0, 4).contains(0)); @@ -176,7 +179,7 @@ public class SemanticPropUtilTest { assertTrue(sp.getForwardingTargetFields(0, 1).contains(5)); assertTrue(sp.getForwardingTargetFields(1, 0).contains(6)); - pMap = new int[]{2,3,-1,0}; + pMap = new int[]{2, 3, -1, 0}; iMap = new boolean[]{true, true, false, true}; sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, iMap, fiveIntTupleType, intType); assertTrue(sp.getForwardingTargetFields(0, 2).contains(0)); @@ -184,7 +187,7 @@ public class SemanticPropUtilTest { assertTrue(sp.getForwardingTargetFields(1, 0).contains(2)); assertTrue(sp.getForwardingTargetFields(0, 0).contains(3)); - pMap = new int[]{-1,-1}; + pMap = new int[]{-1, -1}; iMap = new boolean[]{false, true}; sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, iMap, intType, nestedPojoType); assertTrue(sp.getForwardingTargetFields(1, 0).contains(0)); @@ -195,7 +198,7 @@ public class SemanticPropUtilTest { assertTrue(sp.getForwardingTargetFields(1, 5).contains(5)); assertTrue(sp.getForwardingTargetFields(0, 0).contains(6)); - pMap = new int[]{-1,-1}; + pMap = new int[]{-1, -1}; iMap = new boolean[]{true, false}; sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, iMap, intType, nestedPojoType); assertTrue(sp.getForwardingTargetFields(0, 0).contains(0)); @@ -220,7 +223,7 @@ public class SemanticPropUtilTest { semProps.addForwardedField(0, 4); semProps.addForwardedField(2, 0); semProps.addForwardedField(4, 3); - semProps.addReadFields(new FieldSet(0,3)); + semProps.addReadFields(new FieldSet(0, 3)); SemanticProperties offsetProps = SemanticPropUtil.addSourceFieldOffset(semProps, 5, 0); @@ -325,7 +328,7 @@ public class SemanticPropUtilTest { @Test public void testForwardedNoArrowIndividualStrings() { - String[] forwardedFields = {"f2","f3","f0"}; + String[] forwardedFields = {"f2", "f3", "f0"}; SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, fiveIntTupleType, fiveIntTupleType); @@ -405,7 +408,7 @@ public class SemanticPropUtilTest { assertTrue(sp.getForwardingTargetFields(0, 0).contains(0)); assertTrue(sp.getForwardingTargetFields(0, 1).contains(2)); } - + @Test public void testForwardedMixedOneString() { String[] forwardedFields = {"f2;f3;f0->f4;f4->f0"}; @@ -685,7 +688,7 @@ public class SemanticPropUtilTest { @Test(expected = InvalidSemanticAnnotationException.class) public void testForwardedInvalidTargetFieldType1() { - String[] forwardedFields = {"f0->f0","f1->f2"}; + String[] forwardedFields = {"f0->f0", "f1->f2"}; SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, fiveIntTupleType, threeMixedTupleType); } @@ -766,7 +769,7 @@ public class SemanticPropUtilTest { SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); SemanticPropUtil.getSemanticPropsSingleFromString(sp, forwardedFields, null, null, threeIntTupleType, threeIntTupleType); } - + // -------------------------------------------------------------------------------------------- // Non-Forwarded Fields Annotation // -------------------------------------------------------------------------------------------- @@ -947,8 +950,7 @@ public class SemanticPropUtilTest { SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, nonForwardedFields, null, threeIntTupleType, threeIntTupleType); } - - + // -------------------------------------------------------------------------------------------- // Read Fields Annotation // -------------------------------------------------------------------------------------------- @@ -964,7 +966,7 @@ public class SemanticPropUtilTest { assertTrue(fs.contains(2)); assertTrue(fs.contains(1)); } - + @Test public void testReadFieldsOneString() { String[] readFields = { "f1;f2" }; @@ -1144,11 +1146,11 @@ public class SemanticPropUtilTest { SingleInputSemanticProperties sp = new SingleInputSemanticProperties(); SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, null, readFields, threeIntTupleType, threeIntTupleType); } - + // -------------------------------------------------------------------------------------------- // Two Inputs // -------------------------------------------------------------------------------------------- - + @Test public void testForwardedDual() { String[] forwardedFieldsFirst = { "f1->f2; f2->f3" }; @@ -1216,7 +1218,6 @@ public class SemanticPropUtilTest { assertTrue(dsp.getForwardingTargetFields(1, 3).contains(5)); } - @Test public void testNonForwardedDual() { String[] nonForwardedFieldsFirst = { "f1;f2" }; @@ -1438,6 +1439,9 @@ public class SemanticPropUtilTest { // Pojo Type Classes // -------------------------------------------------------------------------------------------- + /** + * Sample test pojo. + */ public static class TestPojo { public int int1; @@ -1446,6 +1450,9 @@ public class SemanticPropUtilTest { public String string1; } + /** + * Sample test pojo. + */ public static class TestPojo2 { public int myInt1; @@ -1454,6 +1461,9 @@ public class SemanticPropUtilTest { public String myString1; } + /** + * Sample test pojo with nested type. + */ public static class NestedTestPojo{ public int int1;